Skip to content
This repository has been archived by the owner on May 22, 2023. It is now read-only.

Commit

Permalink
Merge branch 'ioni' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Talbot committed Jun 4, 2018
2 parents 3632404 + ffa2c5b commit c54ce47
Show file tree
Hide file tree
Showing 17 changed files with 390 additions and 13 deletions.
5 changes: 5 additions & 0 deletions .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ TWITCH_CHANNEL=
# example : 46985892
TWITCH_CHANNEL_ID=

# example : https://example.com
WEBSERVER_BASE_URL=
WEBSERVER_PORT=3000
TWITCH_WEBHOOK_STREAMS_CALLBACK_PATH=/twitch/streams

# Mongodb server connection
MONGO_HOST=mongo
MONGO_PORT=27017
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ What things you need to install the software and how to install them

### Installing

* Go to the project dir : `cd nozomiBot`
* Copy `.env.dist` to `.env` : `cp .env.dist .env`
* Configure `.env` : `vim .env`
* Set the rights to `var` (`1000` is the user `node` in the node container) : `setfacl -dR -m u:$(id -u):rwX -m u:1000:rwX var`
* Install the dependencies : `yarn install`
* Install the dependencies : `docker run -it --rm -u $(id -u):$(id -g) -v "$PWD":/app -w /app node:8-alpine yarn install`
* Launch Docker containers :
* development env : `docker-compose up -d`
* production env : `docker stack deploy -c docker-compose.yml <name>`
Expand Down
14 changes: 12 additions & 2 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
const fs = require('fs');

const logger = require('pino')({
extreme: false
extreme: false,
base: null,
}, fs.createWriteStream(`./var/log/${process.env.NODE_ENV}.log`, {'flags': 'a'}));

const DatabaseManager = require('./lib/Database/DatabaseManager');
Expand All @@ -27,18 +28,27 @@
const TwitchConnectorIO = require('./lib/Connector/TwitchConnectorIO');
const StaticCommandRepository = require('./lib/Database/Repository/StaticCommandRepository');
const UserRepository = require('./lib/Database/Repository/UserRepository');
const StreamInfoRepository = require('./lib/Database/Repository/StreamInfoRepository');
const WebhookServer = require('./lib/Webhook/WebhookServer');
const TwitchWebhook = require('./lib/Webhook/TwitchWebhook');

const dbManager = new DatabaseManager(logger);
await dbManager.init();
const staticCommandRepo = new StaticCommandRepository(dbManager);
const userRepo = new UserRepository(dbManager);
const streamInfoRepository = new StreamInfoRepository(dbManager);

const cacheManager = new CacheManager(logger);
await cacheManager.init();

const webhookServer = new WebhookServer(logger, 3000);
webhookServer.init();
const twitchWebhook = new TwitchWebhook(logger, webhookServer, cacheManager, streamInfoRepository);
await twitchWebhook.init();

const connectorManager = new ConnectorManager();

const scio = new StandardConnectorIO(process.env.EXIT_COMMAND, dbManager, cacheManager, logger);
const scio = new StandardConnectorIO(process.env.EXIT_COMMAND, dbManager, cacheManager, webhookServer, logger);
await scio.init();
connectorManager.addConnector(scio);

Expand Down
24 changes: 23 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ services:
- .env
networks:
- app
ports:
- 127.0.0.1:80:3000
stdin_open: true
tty: true
entrypoint: ["node", "app.js"]
restart: unless-stopped
# restart: unless-stopped
depends_on:
- mongo
- redis
Expand All @@ -46,3 +48,23 @@ services:
ports:
- 127.0.0.1:6379:6379
restart: unless-stopped
# elk:
# image: sebp/elk:624
# hostname: ${COMPOSE_PROJECT_NAME}-elk
# volumes:
# - ./logstash.conf:/etc/logstash/conf.d/02-beats-input.conf
# networks:
# - app
# ports:
# - 127.0.0.1:5601:5601 # Kibana web interface
# - 127.0.0.1:9200:9200 # ElasticSearch JSON interface
# - 127.0.0.1:5044:5044 # Logstash beats interface
# filebeat:
# image: docker.elastic.co/beats/filebeat:6.2.4
# volumes:
# - ./filebeat.yml:/usr/share/filebeat/filebeat.yml
# - ./:/app
# depends_on:
# - elk
# networks:
# - app
22 changes: 22 additions & 0 deletions filebeat.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
output:
logstash:
enabled: true
hosts:
- elk:5044
timeout: 15

filebeat:
prospectors:
- type: log
enabled: true
paths:
- "/app/var/log/*.log"
tags: ["json"]
json:
keys_under_root: true
add_error_key: true
message_key: msg

setup:
kibana:
host: elk:5601
4 changes: 3 additions & 1 deletion lib/Connector/StandardConnectorIO.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ const ConnectorIO = require('./ConnectorIO');
const CommandExchange = require('../Command/CommandExchange');

module.exports = class StandardConnectorIO extends ConnectorIO {
constructor (exitCommand, dbManager, cacheManager, logger) {
constructor (exitCommand, dbManager, cacheManager, webhookServer, logger) {
super();

this.logger = logger.child({subject: 'StandardConnectorIO'});
this.dbManager = dbManager;
this.cacheManager = cacheManager;
this.webhookServer = webhookServer;
this.exitCommand = exitCommand || 'exit';
}

Expand Down Expand Up @@ -59,6 +60,7 @@ module.exports = class StandardConnectorIO extends ConnectorIO {
this.rl.close();
await this.dbManager.stop();
await this.cacheManager.stop();
await this.webhookServer.stop();
process.exit();
}
};
3 changes: 2 additions & 1 deletion lib/Connector/TwitchConnectorIO.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module.exports = class TwitchConnectorIO extends ConnectorIO {

this.apiBaseUrl = 'https://api.twitch.tv/helix';
this.client = null;
this.logger = logger.child({subject: 'TwitchConnectorIO'});
this.logger = logger.child({subject: this.constructor.name});
this.cacheManager = cacheManager;
this.userRepository = userRepository;
this.apiHeaders = {
Expand Down Expand Up @@ -55,6 +55,7 @@ module.exports = class TwitchConnectorIO extends ConnectorIO {
});
this.client.on('message', (channel, userstate, message, self) => {
if (self) return;
console.log(userstate);
this.onMessage(channel, userstate, message);
});
try {
Expand Down
4 changes: 4 additions & 0 deletions lib/Database/Model/StreamInfo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const StreamInfoSchema = require('../Schema/StreamInfoSchema');
const mongoose = require('mongoose');

module.exports = mongoose.model('StreamInfo', StreamInfoSchema);
4 changes: 2 additions & 2 deletions lib/Database/Repository/StaticCommandRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ module.exports = class StaticCommandRepository {
return StaticCommand.findOne({ 'name': name });
}

async create (staticCommand) {
async create (entity) {
return new Promise((resolve, reject) => {
staticCommand.save((err) => {
entity.save((err) => {
if (err) {
reject(err);
return;
Expand Down
27 changes: 27 additions & 0 deletions lib/Database/Repository/StreamInfoRepository.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
const StreamInfo = require('../Model/StreamInfo');

module.exports = class StreamInfoRepository {
constructor (dbManager) {
this.dbManager = dbManager;
}

async findOneByChannelId (channelId) {
return StreamInfo.findOne({ 'channelId': channelId });
}

async create (entity) {
return new Promise((resolve, reject) => {
entity.save((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
}

async findAll () {
return StreamInfo.find();
}
};
4 changes: 2 additions & 2 deletions lib/Database/Repository/UserRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ module.exports = class UserRepository {
return User.findOne({ 'username': username });
}

async create (staticCommand) {
async create (entity) {
return new Promise((resolve, reject) => {
staticCommand.save((err) => {
entity.save((err) => {
if (err) {
reject(err);
return;
Expand Down
7 changes: 7 additions & 0 deletions lib/Database/Schema/StreamInfoSchema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const mongoose = require('mongoose');

module.exports = new mongoose.Schema({
channelId: { type: String, unique: true },
startedAt: Date,
endedAt: Date,
});
126 changes: 126 additions & 0 deletions lib/Webhook/TwitchWebhook.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
const request = require('request');
const StreamInfo = require('../Database/Model/StreamInfo');

module.exports = class TwitchWebhook {
constructor (logger, server, cacheManager, streamInfoRepository) {
this.logger = logger.child({subject: this.constructor.name});
this.server = server;
this.cacheManager = cacheManager;
this.streamInfoRepository = streamInfoRepository;
this.twitchBaseUrl = 'https://api.twitch.tv/helix';
}

init () {
if (!process.env.TWITCH_CHANNEL_ID || !process.env.TWITCH_WEBHOOK_SECRET) {
console.error('[TwitchWebhook] TWITCH_CHANNEL_ID or TWITCH_WEBHOOK_SECRET is not defined');
this.logger.error('TWITCH_CHANNEL_ID or TWITCH_WEBHOOK_SECRET is not defined');
return;
}

this.server.addRoute('/twitch/streams', (url, request, response, body) => {
return this._handleStreamWebhook(url, request, response, body);
});
this._subscribeStream();
}

async _subscribeStream () {
if (!process.env.WEBSERVER_BASE_URL || !process.env.TWITCH_WEBHOOK_STREAMS_CALLBACK_PATH) {
console.error('[TwitchWebhook] WEBSERVER_BASE_URL or TWITCH_WEBHOOK_STREAMS_CALLBACK_PATH is not defined');
this.logger.error('WEBSERVER_BASE_URL or TWITCH_WEBHOOK_STREAMS_CALLBACK_PATH is not defined');
return;
}
// TODO : keep in database the date where subscribe. or CRON each 10 days to subscribe webhooks. or setInterval()

return new Promise((resolve, reject) => {
request.post({
uri: `${this.twitchBaseUrl}/webhooks/hub`,
headers: {
'Client-ID': process.env.TWITCH_CLIENT_ID,
'Content-Type': 'application/json',
},
json: {
'hub.callback': process.env.WEBSERVER_BASE_URL + process.env.TWITCH_WEBHOOK_STREAMS_CALLBACK_PATH,
'hub.mode': 'subscribe',
'hub.topic': `${this.twitchBaseUrl}/streams?user_id=${process.env.TWITCH_CHANNEL_ID}`,
'hub.secret': process.env.TWITCH_WEBHOOK_SECRET,
'hub.lease_seconds': 3600, // TODO : for prod : 864000
},
}, (err, response, body) => {
if (err) {
console.error(err);
reject(new Error(err));
return;
}
if (body && body.error) {
console.error(`failed to subscribe to Twitch webhook : ${body.message}`);
reject(new Error(body.message));
return;
}
console.log('Twitch webhook streams has been subscribed');
resolve();
/*
headers ratelimit
'ratelimit-limit': '30',
'ratelimit-remaining': '29',
'ratelimit-reset': '1527968920',
*/
});
});
}

/**
* @param {URL} url
* @param request
* @param response
* @param body
* @private
*/
async _handleStreamWebhook (url, request, response, body) {
const hubMode = url.searchParams.get('hub.mode');
const hubChallenge = url.searchParams.get('hub.challenge');
if (hubMode === 'subscribe') {
console.log('respond to the challenge subscription');
response.statusCode = 200;
return response.end(hubChallenge);
}

if (body.data) {
if (body.data.length === 0) {
// offline
this.logger.info('stream is now offline');

let streamInfoEntity = await this.streamInfoRepository.findOneByChannelId(process.env.TWITCH_CHANNEL_ID);
if (!streamInfoEntity) {
streamInfoEntity = new StreamInfo({
channelId: process.env.TWITCH_CHANNEL_ID,
});
}
streamInfoEntity.endedAt = new Date();
this.streamInfoRepository.create(streamInfoEntity).catch(err => {
this.logger.error(`fail to create/update the stream info ${streamInfoEntity.channelId} : ${err}`);
});
} else {
// online
this.logger.info('stream is now online');
const streamInfo = body.data[0];

let streamInfoEntity = await this.streamInfoRepository.findOneByChannelId(streamInfo.user_id);
if (!streamInfoEntity) {
streamInfoEntity = new StreamInfo({
channelId: streamInfo.user_id,
});
}
streamInfoEntity.startedAt = new Date(streamInfo.started_at);
this.streamInfoRepository.create(streamInfoEntity).catch(err => {
this.logger.error(`fail to create/update the stream info ${streamInfo.user_id} : ${err}`);
});
this.cacheManager.setObject(`stream_info_${streamInfo.user_id}`, streamInfo, 300);
}
response.statusCode = 200;
return response.end();
}

response.statusCode = 400;
return response.end();
}
};

0 comments on commit c54ce47

Please sign in to comment.