This repository has been archived by the owner on Sep 30, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
/
Replicator.js
185 lines (154 loc) · 4.92 KB
/
Replicator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
const EventEmitter = require('events').EventEmitter
const pMap = require('p-map')
const Log = require('ipfs-log')
const Logger = require('logplease')
const logger = Logger.create('replicator', { color: Logger.Colors.Cyan })
Logger.setLogLevel('ERROR')
const getNext = e => e.next
const flatMap = (res, val) => res.concat(val)
const notNull = entry => entry !== null && entry !== undefined
const uniqueValues = (res, val) => {
res[val] = val
return res
}
const batchSize = 1
class Replicator extends EventEmitter {
constructor (store, concurrency) {
super()
this._store = store
this._fetching = {}
this._stats = {
tasksRequested: 0,
tasksStarted: 0,
tasksProcessed: 0
}
this._buffer = []
this._concurrency = concurrency || 128
this._queue = {}
this._q = new Set()
// Flush the queue as an emergency switch
this._flushTimer = setInterval(() => {
if (this.tasksRunning === 0 && Object.keys(this._queue).length > 0) {
logger.warn('Had to flush the queue!', Object.keys(this._queue).length, 'items in the queue, ', this.tasksRequested, this.tasksFinished, ' tasks requested/finished')
setTimeout(() => this._processQueue(), 0)
}
}, 3000)
}
/**
* Returns the number of tasks started during the life time
* @return {[Integer]} [Number of tasks started]
*/
get tasksRequested () {
return this._stats.tasksRequested
}
/**
* Returns the number of tasks started during the life time
* @return {[Integer]} [Number of tasks running]
*/
get tasksStarted () {
return this._stats.tasksStarted
}
/**
* Returns the number of tasks running currently
* @return {[Integer]} [Number of tasks running]
*/
get tasksRunning () {
return this._stats.tasksStarted - this._stats.tasksProcessed
}
/**
* Returns the number of tasks currently queued
* @return {[Integer]} [Number of tasks queued]
*/
get tasksQueued () {
return Math.max(Object.keys(this._queue).length - this.tasksRunning, 0)
}
/**
* Returns the number of tasks finished during the life time
* @return {[Integer]} [Number of tasks finished]
*/
get tasksFinished () {
return this._stats.tasksProcessed
}
/**
* Returns the hashes currently queued
* @return {[Array<String>]} [Queued hashes]
*/
getQueue () {
return Object.values(this._queue)
}
/*
Process new heads.
*/
load (entries) {
const notKnown = entry => {
const hash = entry.hash || entry
return !this._store._oplog.has(hash) && !this._fetching[hash] && !this._queue[hash]
}
try {
entries
.filter(notNull)
.filter(notKnown)
.forEach(this._addToQueue.bind(this))
setTimeout(() => this._processQueue(), 0)
} catch (e) {
console.error(e)
}
}
stop () {
// Clears the queue flusher
clearInterval(this._flushTimer)
// Remove event listeners
this.removeAllListeners('load.added')
this.removeAllListeners('load.end')
this.removeAllListeners('load.progress')
}
_addToQueue (entry) {
const hash = entry.hash || entry
this._stats.tasksRequested += 1
this._queue[hash] = entry
}
async _processQueue () {
if (this.tasksRunning < this._concurrency) {
const capacity = this._concurrency - this.tasksRunning
const items = Object.values(this._queue).slice(0, capacity).filter(notNull)
items.forEach(entry => delete this._queue[entry.hash || entry])
const flattenAndGetUniques = (nexts) => nexts.reduce(flatMap, []).reduce(uniqueValues, {})
const processValues = (nexts) => {
const values = Object.values(nexts).filter(notNull)
if ((items.length > 0 && this._buffer.length > 0) ||
(this.tasksRunning === 0 && this._buffer.length > 0)) {
const logs = this._buffer.slice()
this._buffer = []
this.emit('load.end', logs)
}
if (values.length > 0) {
this.load(values)
}
}
return pMap(items, e => this._processOne(e))
.then(flattenAndGetUniques)
.then(processValues)
}
}
async _processOne (entry) {
const hash = entry.hash || entry
if (this._store._oplog.has(hash) || this._fetching[hash]) {
return
}
this._fetching[hash] = hash
this.emit('load.added', entry)
this._stats.tasksStarted += 1
const exclude = []
const log = await Log.fromEntryHash(this._store._ipfs, this._store.identity, hash, { logId: this._store._oplog.id, access: this._store.access, length: batchSize, exclude })
this._buffer.push(log)
const latest = log.values[0]
delete this._queue[hash]
// Mark this task as processed
this._stats.tasksProcessed += 1
// Notify subscribers that we made progress
this.emit('load.progress', this._id, hash, latest, null, this._buffer.length)
// Return all next pointers
return log.values.map(getNext).reduce(flatMap, [])
}
}
module.exports = Replicator