-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathindex.ts
238 lines (204 loc) · 7.97 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import exitHook = require('exit-hook')
import { DataTransferLockingQueue, DataTransferSimpleQueue, DataTransferQueueBase } from './dataTransferQueue'
import DataTransferUploadStill from './dataTransferUploadStill'
import { DataTransferUploadClipFrame, DataTransferUploadClip } from './dataTransferUploadClip'
import DataTransferUploadAudio from './dataTransferUploadAudio'
import { IDeserializedCommand, ISerializableCommand } from '../commands/CommandBase'
import DataTransferUploadMultiViewerLabel from './dataTransferUploadMultiViewerLabel'
import { DataTransferDownloadMacro } from './dataTransferDownloadMacro'
import { DataTransferUploadMacro } from './dataTransferUploadMacro'
import { LockObtainedCommand, LockStateUpdateCommand } from '../commands/DataTransfer'
import debug0 from 'debug'
import type { UploadBufferInfo } from './dataTransferUploadBuffer'
import { DataTransferDownloadStill } from './dataTransferDownloadStill'
const MAX_PACKETS_TO_SEND_PER_TICK = 50
const MAX_TRANSFER_INDEX = (1 << 16) - 1 // Inclusive maximum
const debug = debug0('atem-connection:data-transfer:manager')
export interface UploadStillEncodingOptions {
disableRLE?: boolean
}
export class DataTransferManager {
#nextTransferIdInner = 0
readonly #nextTransferId = (): number => {
const index = this.#nextTransferIdInner++
if (this.#nextTransferIdInner > MAX_TRANSFER_INDEX) this.#nextTransferIdInner = 0
return index
}
readonly #sendLockCommand = (/*lock: DataTransferLockingQueue,*/ cmd: ISerializableCommand): void => {
this.#rawSendCommands([cmd]).catch((e) => {
debug(`Failed to send lock command: ${e}`)
console.log('Failed to send lock command')
})
}
readonly #stillsLock = new DataTransferLockingQueue(0, this.#sendLockCommand, this.#nextTransferId)
readonly #clipLocks = new Map<number, DataTransferLockingQueue>() // clipLocks get dynamically allocated
readonly #labelsLock = new DataTransferSimpleQueue(this.#nextTransferId)
readonly #macroLock = new DataTransferSimpleQueue(this.#nextTransferId)
readonly #rawSendCommands: (cmds: ISerializableCommand[]) => Promise<void>
private interval?: NodeJS.Timer
private exitUnsubscribe?: () => void
constructor(rawSendCommands: (cmds: ISerializableCommand[]) => Promise<void>) {
this.#rawSendCommands = rawSendCommands
}
private get allLocks() {
return [this.#stillsLock, ...this.#clipLocks.values(), this.#labelsLock, this.#macroLock]
}
/**
* Start sending of commands
* This is called once the connection has received the initial state data
*/
public startCommandSending(skipUnlockAll?: boolean): void {
// TODO - abort any active transfers
if (!this.interval) {
// New connection means a new queue
if (!skipUnlockAll) {
debug(`Clearing all held locks`)
for (const lock of this.allLocks) {
lock.clearQueueAndAbort(new Error('Restarting connection'))
}
}
this.interval = setInterval(() => {
for (const lock of this.allLocks) {
const commandsToSend = lock.popQueuedCommands(MAX_PACKETS_TO_SEND_PER_TICK) // Take some, it is unlikely that multiple will run at once
if (commandsToSend && commandsToSend.length > 0) {
// debug(`Sending ${commandsToSend.length} commands `)
this.#rawSendCommands(commandsToSend).catch((e) => {
// Failed to send/queue something, so abort it
lock.tryAbortTransfer(new Error(`Command send failed: ${e}`))
})
}
}
}, 2) // TODO - refine this. perhaps we can stop and restart the interval?
}
if (!this.exitUnsubscribe) {
this.exitUnsubscribe = exitHook(() => {
debug(`Exit auto-cleanup`)
// TODO - replace this with a WeakRef to the parent class?
this.stopCommandSending()
})
}
}
/**
* Stop sending of commands
* This is called once the connection is disconnected
*/
public stopCommandSending(): void {
debug('Stopping command sending')
for (const lock of this.allLocks) {
lock.clearQueueAndAbort(new Error('Stopping connection'))
}
if (this.exitUnsubscribe) {
this.exitUnsubscribe()
this.exitUnsubscribe = undefined
}
if (this.interval) {
clearInterval(this.interval)
this.interval = undefined
}
}
/**
* Queue the handling of a received command
* We do it via a queue as some of the handlers need to be async, and we don't want to block state updates from happening in parallel
*/
public queueHandleCommand(command: IDeserializedCommand): void {
debug(`Received command ${command.constructor.name}: ${JSON.stringify(command)}`)
if (command instanceof LockObtainedCommand || command instanceof LockStateUpdateCommand) {
let lock: DataTransferLockingQueue | undefined
if (command.properties.index === 0) {
lock = this.#stillsLock
} else if (command.properties.index >= 100) {
// Looks like a special lock that we arent expecting
// Ignore it for now
return
} else {
lock = this.#clipLocks.get(command.properties.index - 1)
}
// Must be a clip that we aren't expecting
if (!lock)
lock = new DataTransferLockingQueue(
command.properties.index,
this.#sendLockCommand,
this.#nextTransferId
)
// handle actual command
if (command instanceof LockObtainedCommand) {
lock.lockObtained()
} else if (command instanceof LockStateUpdateCommand) {
lock.updateLock(command.properties.locked)
}
return
}
// If this command is for a transfer
if (command.properties.transferId !== undefined) {
// try to establish the associated DataLock:
let lock: DataTransferQueueBase | undefined
for (const _lock of this.allLocks) {
if (_lock.currentTransferId === command.properties.transferId) {
lock = _lock
}
}
// console.log('CMD', command.constructor.name)
// Doesn't appear to be for a known lock
// TODO - should we fire an abort back just in case?
if (!lock) return
lock.handleCommand(command)
// } else {
// // debugging:
// console.log('UNKNOWN COMMAND:', command)
}
}
public async downloadStill(index: number): Promise<Buffer> {
const transfer = new DataTransferDownloadStill(index)
return this.#stillsLock.enqueue(transfer)
}
public async uploadStill(index: number, data: UploadBufferInfo, name: string, description: string): Promise<void> {
const transfer = new DataTransferUploadStill(index, data, name, description)
return this.#stillsLock.enqueue(transfer)
}
public async uploadClip(
index: number,
data: Iterable<UploadBufferInfo> | AsyncIterable<UploadBufferInfo>,
name: string
): Promise<void> {
const provideFrame = async function* (): AsyncGenerator<DataTransferUploadClipFrame, undefined> {
let id = -1
for await (const frame of data) {
id++
yield new DataTransferUploadClipFrame(index, id, frame)
}
return undefined
}
const transfer = new DataTransferUploadClip(index, name, provideFrame(), this.#nextTransferId)
const lock = this.getClipLock(index)
return lock.enqueue(transfer)
}
public async uploadAudio(index: number, data: Buffer, name: string): Promise<void> {
const transfer = new DataTransferUploadAudio(index, data, name)
const lock = this.getClipLock(index)
return lock.enqueue(transfer)
}
public async downloadMacro(index: number): Promise<Buffer> {
const transfer = new DataTransferDownloadMacro(index)
return this.#macroLock.enqueue(transfer)
}
public async uploadMacro(index: number, data: Buffer, name: string): Promise<void> {
const transfer = new DataTransferUploadMacro(index, data, name)
return this.#macroLock.enqueue(transfer)
}
public async uploadMultiViewerLabel(index: number, data: Buffer): Promise<void> {
const transfer = new DataTransferUploadMultiViewerLabel(index, data)
return this.#labelsLock.enqueue(transfer)
}
private getClipLock(index: number): DataTransferLockingQueue {
const lock = this.#clipLocks.get(index)
if (lock) {
return lock
} else if (index >= 0 && index < 20) {
const lock = new DataTransferLockingQueue(index + 1, this.#sendLockCommand, this.#nextTransferId)
this.#clipLocks.set(index, lock)
return lock
} else {
throw new Error('Invalid clip index')
}
}
}