Skip to content

Commit

Permalink
core: Add cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
sogehige committed Apr 3, 2018
1 parent 9223d57 commit 562a78e
Show file tree
Hide file tree
Showing 118 changed files with 5,287 additions and 5,094 deletions.
136 changes: 136 additions & 0 deletions cluster.js
@@ -0,0 +1,136 @@
'use strict'

const util = require('util')
const _ = require('lodash')
const debug = require('debug')('cluster:worker')

const Parser = require('./libs/parser')

var workerIsFree = {
message: true,
db: true
}

cluster()

function cluster () {
if (!global.db.engine.connected) {
setTimeout(() => cluster(), 10)
return
}

global.configuration = new (require('./libs/configuration.js'))()
global.currency = new (require('./libs/currency.js'))()
global.users = new (require('./libs/users.js'))()
global.events = new (require('./libs/events.js'))()
global.twitch = new (require('./libs/twitch'))()
global.permissions = new (require('./libs/permissions'))()

global.lib = {}
global.lib.translate = new (require('./libs/translate'))()
global.translate = global.lib.translate.translate

global.lib.translate._load().then(function () {
global.systems = require('auto-load')('./libs/systems/')
global.overlays = require('auto-load')('./libs/overlays/')
global.games = require('auto-load')('./libs/games/')
global.integrations = require('auto-load')('./libs/integrations/')

debug(`Worker ${process.pid} has started.`)

process.on('message', async (data) => {
switch (data.type) {
case 'shutdown':
gracefullyExit()
break
case 'message':
workerIsFree.message = false
await message(data)
workerIsFree.message = true
break
case 'db':
workerIsFree.db = false
switch (data.fnc) {
case 'find':
data.items = await global.db.engine.find(data.table, data.where)
break
case 'findOne':
data.items = await global.db.engine.findOne(data.table, data.where)
break
case 'increment':
data.items = await global.db.engine.increment(data.table, data.where, data.object)
break
case 'incrementOne':
data.items = await global.db.engine.incrementOne(data.table, data.where, data.object)
break
case 'insert':
data.items = await global.db.engine.insert(data.table, data.object)
break
case 'remove':
data.items = await global.db.engine.remove(data.table, data.where)
break
case 'update':
data.items = await global.db.engine.update(data.table, data.where, data.object)
break
}
process.send(data)
workerIsFree.db = true
}
})
})

async function message (data) {
let sender = data.sender
let message = data.message
let skip = data.skip

const parse = new Parser({ sender: sender, message: message, skip: skip })

if (!skip && sender['message-type'] === 'whisper' && (!(await global.configuration.getValue('disableWhisperListener')) || global.parser.isOwner(sender))) {
global.log.whisperIn(message, {username: sender.username})
} else if (!skip && !global.commons.isBot(sender.username)) global.log.chatIn(message, {username: sender.username})

let [isModerated, isIgnored] = await Promise.all([
parse.isModerated(),
global.commons.isIgnored(sender)
])

if (!isModerated && !isIgnored && !global.commons.isBot(sender.username)) {
let data = { id: sender['user-id'], is: { online: true, subscriber: _.get(sender, 'subscriber', false), mod: _.get(sender, 'mod', false) } }
if (!_.get(sender, 'subscriber', false)) _.set(data, 'stats.tier', 0) // unset tier if sender is not subscriber

await Promise.all([
global.db.engine.update('users', { username: sender.username }, data),
parse.process()
])
process.send({ type: 'api', fnc: 'isFollower', username: sender.username })

if (message.startsWith('!')) {
global.events.fire('command-send-x-times', { username: sender.username, message: message })
} else if (!message.startsWith('!') && await global.cache.isOnline()) global.db.engine.increment('users', { username: sender.username }, { stats: { messages: 1 } })
}
process.send({ type: 'stats', of: 'parser', value: parse.time() })
}
}

process.on('unhandledRejection', function (reason, p) {
global.log.error(`Possibly Unhandled Rejection at: ${util.inspect(p)} reason: ${reason}`)
})

process.on('uncaughtException', (error) => {
if (_.isNil(global.log)) return console.log(error)
global.log.error(util.inspect(error))
global.log.error('+------------------------------------------------------------------------------+')
global.log.error('| WORKER HAS UNEXPECTEDLY CRASHED |')
global.log.error('| PLEASE CHECK https://github.com/sogehige/SogeBot/wiki/How-to-report-an-issue |')
global.log.error('| AND ADD logs/exceptions.log file to your report |')
global.log.error('+------------------------------------------------------------------------------+')
process.exit(1)
})

function gracefullyExit () {
if (_.every(workerIsFree)) {
debug(`Exiting gracefully worker ${process.pid}`)
process.exit()
} else setTimeout(() => gracefullyExit(), 10)
}
1 change: 1 addition & 0 deletions config.example.json
@@ -1,5 +1,6 @@
{
"__README__": "https://github.com/sogehige/SogeBot/wiki/Install-and-Upgrade",
"cpu": 4,
"settings": {
"__COMMENT__": "get bot oauth at http://oauth.sogehige.tv",
"bot_username": "bot_username_here",
Expand Down

0 comments on commit 562a78e

Please sign in to comment.