-
Notifications
You must be signed in to change notification settings - Fork 4
/
tracker.js
130 lines (105 loc) · 4.19 KB
/
tracker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*!
* localhost-tunnel
* Copyright(c) 2020 sheikhmishar
* Copyright(c) 2020 omranjamal
* GPLv3 Licensed
*/
const socketIO = require('socket.io')
const express = require('express')
const debugp = require('debug')
const http = require('http')
const debug = debugp('tracker')
const ioDebug = debugp('tracker:io')
const trackerApp = express()
const trackerServer = http.createServer(trackerApp)
const trackerIo = socketIO(trackerServer, { path: '/list', serveClient: false })
const subslistChannel = trackerIo.of('/list')
const TRACKER_IP = process.env.TRACKER_IP || '127.0.0.1'
const TRACKER_PORT = parseInt(process.env.TRACKER_PORT) || 5100
/** @type {upstreamInfo} */
const upstreamsInfo = {}
/** @type {SocketIO.Socket[]} */
const upstreamSockets = []
/** @type {(upstream: SocketIO.Socket) => (address: string) => void} */
const upstreamOnJoin = upstream => address => {
ioDebug('SERVER SOCKET', upstream.id, 'ADDRESS', address)
upstreamsInfo[upstream.id].address = address
const { [upstream.id]: drop, ...restUpstreamsInfo } = upstreamsInfo
upstream.emit('init', restUpstreamsInfo)
for (const u of upstreamSockets) u.emit('upstream_join', upstream.id, address)
upstreamSockets.push(upstream)
}
/** @type {(upstream: SocketIO.Socket) => () => void} */
const upstreamOnLeave = upstream => () => {
ioDebug('SERVER SOCKET', upstream.id, 'DISCONNECTED')
delete upstreamsInfo[upstream.id]
let upstreamIndex
for (let i = 0; i < upstreamSockets.length; i++)
if (upstreamSockets[i].id === upstream.id) upstreamIndex = i
else upstreamSockets[i].emit('upstream_leave', upstream.id)
ioDebug('UPSTREAM INDEX', upstreamIndex, upstream.id)
upstreamSockets.splice(upstreamIndex, 1)
}
/** @type {(upstream: SocketIO.Socket) => (address: string) => void} */
const upstreamOnSubsJoin = upstream => username => {
ioDebug(username, 'CONNECTED TO SERVER', upstream.id)
upstreamsInfo[upstream.id].subs.push(username)
for (const u of upstreamSockets)
if (u.id !== upstream.id)
u.emit('upstream_subs_join', upstream.id, username)
}
/** @type {(upstream: SocketIO.Socket) => (address: string) => void} */
const upstreamOnSubsLeave = upstream => username => {
ioDebug(username, 'DISCONNECTED FROM SERVER', upstream.id)
const { subs } = upstreamsInfo[upstream.id]
upstreamsInfo[upstream.id].subs = subs.filter(name => name !== username)
for (const u of upstreamSockets)
if (u.id !== upstream.id)
u.emit('upstream_subs_leave', upstream.id, username)
}
/** @param {SocketIO.Socket} upstream */
const onTrackerIoConnect = upstream => {
ioDebug('SERVER SOCKET', upstream.id, 'CONNECTED')
upstreamsInfo[upstream.id] = { subs: [] }
upstream.on('upstream_join', upstreamOnJoin(upstream))
upstream.on('disconnect', upstreamOnLeave(upstream)) // upstream_leave
upstream.on('upstream_subs_join', upstreamOnSubsJoin(upstream))
upstream.on('upstream_subs_leave', upstreamOnSubsLeave(upstream))
}
subslistChannel.on('connection', onTrackerIoConnect)
/** @type {Express.RequestHandler} */
const getSocketsSummary = (_, res) =>
res.json(
upstreamSockets.map(({ connected, handshake, rooms }, i) => ({
connected,
handshake,
rooms,
listenerCount: {
join: upstreamSockets[i].listenerCount('upstream_join'),
disconnect: upstreamSockets[i].listenerCount('disconnect'),
subs_join: upstreamSockets[i].listenerCount('upstream_subs_join'),
subs_leave: upstreamSockets[i].listenerCount('upstream_subs_leave')
}
}))
)
/** @param {string} subsName */
const getSubsAddress = subsName => {
for (const { subs, address } of Object.values(upstreamsInfo))
if (subs.includes(subsName)) return address
return null
}
trackerApp.get('/upstreams', (_, res) => res.json(upstreamsInfo))
trackerApp.get('/subs/:name', ({ params: { name } }, res) =>
res.json({ subscriber_address: getSubsAddress(name) })
)
trackerApp.get('/sockets', getSocketsSummary)
trackerServer
.listen(TRACKER_PORT, TRACKER_IP)
.on('listening', () => debug('Server', trackerServer.address()))
.on('error', err => {
// @ts-ignore
if (err.code === 'EADDRINUSE') {
debug(`Port ${trackerServer.address()} in use. Retry with another one`)
trackerServer.close()
}
})