-
Notifications
You must be signed in to change notification settings - Fork 12
/
streaming-client.ts
163 lines (150 loc) · 6.23 KB
/
streaming-client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import { EventEmitter } from 'events';
import { Duplex, PassThrough } from 'stream';
import { client } from 'websocket';
import { AudioConfig } from './models/streaming/AudioConfig';
import { BufferedDuplex } from './models/streaming/BufferedDuplex';
import { SessionConfig } from './models/streaming/SessionConfig';
import {
StreamingConnected,
StreamingHypothesis,
StreamingResponse
} from './models/streaming/StreamingResponses';
// tslint:disable-next-line
const sdkVersion = require('../package.json').version;
/**
* Client which handles a streaming connection to the Rev.ai API.
* @event httpResponse emitted when the client fails to start a websocket connection and
* receives an http response. Event contains the http status code of the response.
* @event connectFailed emitted when the client fails to begin a websocket connection and
* received a websocket error. Event contains the received error.
* @event connect emitted when the client receives a connected message from the API. Contains
* the StreamingConnected returned from the API.
* @event close emitted when the connection is properly closed by the server. Contains the
* close code and reason.
* @event error emitted when an error occurs in the connection to the server. Contains the
* thrown error.
*/
export class RevAiStreamingClient extends EventEmitter {
baseUrl: string;
client: client;
private accessToken: string;
private config: AudioConfig;
private requests: PassThrough;
private responses: PassThrough;
/**
* @param accessToken Access token associated with the user's account
* @param config Configuration of the audio the user will send from this client
* @param version (optional) Version of the Rev.ai API the user wants to use
*/
constructor(accessToken: string, config: AudioConfig, version = 'v1') {
super();
this.accessToken = accessToken;
this.config = config;
this.baseUrl = `wss://api.rev.ai/speechtotext/${version}/stream`;
this.requests = new PassThrough();
this.responses = new PassThrough({ objectMode: true });
this.client = new client({ keepalive: true, keepaliveInterval: 30000 });
this.setUpHttpResponseHandler();
this.setUpConnectionFailureHandler();
this.setUpConnectedHandlers();
}
/**
* Begins a streaming connection. Returns a duplex
* from which the user can read responses from the api and to which the user
* should write their audio data
* @param config (Optional) Optional configuration for the streaming session
*
* @returns BufferedDuplex. Data written to this buffer will be sent to the api
* Data received from the api can be read from this duplex
*/
public start(config?: SessionConfig): Duplex {
let url = this.baseUrl +
`?access_token=${this.accessToken}` +
`&content_type=${this.config.getContentTypeString()}` +
`&user_agent=${encodeURIComponent(`RevAi-NodeSDK/${sdkVersion}`)}`;
if (config) {
if (config.metadata) {
url += `&metadata=${encodeURIComponent(config.metadata)}`;
}
if (config.customVocabularyID) {
url += `&custom_vocabulary_id=${encodeURIComponent(config.customVocabularyID)}`;
}
if (config.filterProfanity) {
url += `&filter_profanity=true`;
}
if (config.removeDisfluencies) {
url += `&remove_disfluencies=true`;
}
if (config.deleteAfterSeconds !== null && config.deleteAfterSeconds !== undefined) {
url += `&delete_after_seconds=${encodeURIComponent(config.deleteAfterSeconds.toString())}`;
}
}
this.client.connect(url);
return new BufferedDuplex(this.requests, this.responses, { readableObjectMode: true });
}
/**
* Signals to the api that you have finished sending data.
*/
public end(): void {
this.requests.end('EOS', 'utf8');
}
/**
* Immediately kills the streaming connection, no more results will be returned from the API
* after this is called.
*/
public unsafeEnd(): void {
this.client.abort();
this.closeStreams();
}
private setUpHttpResponseHandler(): void {
this.client.on('httpResponse', (response: any) => {
this.emit('httpResponse', response.statusCode);
this.closeStreams();
});
}
private setUpConnectionFailureHandler(): void {
this.client.on('connectFailed', (error: Error) => {
this.emit('connectFailed', error);
this.closeStreams();
});
}
private setUpConnectedHandlers(): void {
this.client.on('connect', (connection: any) => {
connection.on('error', (error: Error) => {
this.emit('error', error);
this.closeStreams();
});
connection.on('close', (code: number, reason: string) => {
this.emit('close', code, reason);
this.closeStreams();
});
connection.on('message', (message: any) => {
if (message.type === 'utf8') {
let response = JSON.parse(message.utf8Data);
if ((response as StreamingResponse).type === 'connected') {
this.emit('connect', response as StreamingConnected);
} else {
this.responses.write(response as StreamingHypothesis);
}
}
});
this.doSendLoop(connection, this.requests);
});
}
private doSendLoop(connection: any, buffer: PassThrough): void {
if (connection.connected) {
let value = buffer.read(buffer.readableLength);
if (value !== null) {
connection.send(value);
if (value.includes('EOS') || value.includes(Buffer.from('EOS'))) {
connection.sendUTF('EOS');
}
}
setTimeout(() => this.doSendLoop(connection, buffer), 100);
}
}
private closeStreams(): void {
this.requests.end();
this.responses.push(null);
}
}