Skip to content

Commit

Permalink
add gzip websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
zerodytrash committed Aug 4, 2022
1 parent 0a5140f commit c4c30b3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 15 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tiktok-live-connector",
"version": "0.9.24",
"version": "0.9.25",
"description": "Node.js module to receive live stream chat events like comments and gifts from TikTok LIVE",
"main": "index.js",
"engines": {
Expand Down
42 changes: 31 additions & 11 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const TikTokHttpClient = require('./lib/tiktokHttpClient.js');
const WebcastWebsocket = require('./lib/webcastWebsocket.js');
const { getRoomIdFromMainPageHtml, validateAndNormalizeUniqueId } = require('./lib/tiktokUtils.js');
const { simplifyObject } = require('./lib/webcastDataConverter.js');
const { deserializeMessage } = require('./lib/webcastProtobuf.js');
const { deserializeMessage, deserializeWebsocketMessage } = require('./lib/webcastProtobuf.js');

const Config = require('./lib/webcastConfig.js');

Expand Down Expand Up @@ -280,16 +280,34 @@ class WebcastPushConnection extends EventEmitter {
* @param {string} messageType
* @param {Buffer} messageBuffer
*/
decodeProtobufMessage(messageType, messageBuffer) {
let webcastMessage = deserializeMessage(messageType, messageBuffer);
this.#processWebcastResponse({
messages: [
{
decodedData: webcastMessage,
type: messageType,
},
],
});
async decodeProtobufMessage(messageType, messageBuffer) {
switch (messageType) {
case 'WebcastResponse': {
let decodedWebcastResponse = deserializeMessage(messageType, messageBuffer);
this.#processWebcastResponse(decodedWebcastResponse);
break;
}

case 'WebcastWebsocketMessage': {
let decodedWebcastWebsocketMessage = await deserializeWebsocketMessage(messageBuffer);
if (typeof decodedWebcastWebsocketMessage.webcastResponse === 'object') {
this.#processWebcastResponse(decodedWebcastWebsocketMessage.webcastResponse);
}
break;
}

default: {
let webcastMessage = deserializeMessage(messageType, messageBuffer);
this.#processWebcastResponse({
messages: [
{
decodedData: webcastMessage,
type: messageType,
},
],
});
}
}
}

async #retrieveRoomId() {
Expand Down Expand Up @@ -377,6 +395,7 @@ class WebcastPushConnection extends EventEmitter {
// Websocket specific params
let wsParams = {
imprp: webcastResponse.wsParam.value,
compress: 'gzip',
};

// Wait until ws connected, then stop request polling
Expand Down Expand Up @@ -487,4 +506,5 @@ class WebcastPushConnection extends EventEmitter {
module.exports = {
WebcastPushConnection,
signatureProvider: require('./lib/tiktokSignatureProvider'),
webcastProtobuf: require('./lib/webcastProtobuf.js'),
};
21 changes: 20 additions & 1 deletion src/lib/webcastProtobuf.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
const protobufjs = require('protobufjs');
const util = require('util');
const zlib = require('zlib');
const unzip = util.promisify(zlib.unzip);

let tiktokSchemaPath = require.resolve('../proto/tiktokSchema.proto');
let tiktokSchema = null;
let config = {
skipMessageTypes: [],
};

// Load & cache schema
function loadTikTokSchema() {
Expand All @@ -23,6 +29,10 @@ function deserializeMessage(protoName, binaryMessage) {
if (protoName === 'WebcastResponse' && Array.isArray(webcastData.messages)) {
// Contains different object structures depending on the type field
webcastData.messages.forEach((message) => {
if (config.skipMessageTypes.includes(message.type)) {
return;
}

switch (message.type) {
case 'WebcastControlMessage':
case 'WebcastRoomUserSeqMessage':
Expand All @@ -46,11 +56,19 @@ function deserializeMessage(protoName, binaryMessage) {
return webcastData;
}

function deserializeWebsocketMessage(binaryMessage) {
async function deserializeWebsocketMessage(binaryMessage) {
// Websocket messages are in an container which contains additional data
// Message type 'msg' represents a normal WebcastResponse
let decodedWebsocketMessage = deserializeMessage('WebcastWebsocketMessage', binaryMessage);
if (decodedWebsocketMessage.type === 'msg') {
let binary = decodedWebsocketMessage.binary;

// Decompress binary (if gzip compressed)
// https://www.rfc-editor.org/rfc/rfc1950.html
if (binary && binary.length > 2 && binary[0] === 0x1f && binary[1] === 0x8b && binary[2] === 0x08) {
decodedWebsocketMessage.binary = await unzip(binary);
}

decodedWebsocketMessage.webcastResponse = deserializeMessage('WebcastResponse', decodedWebsocketMessage.binary);
}

Expand All @@ -61,4 +79,5 @@ module.exports = {
serializeMessage,
deserializeMessage,
deserializeWebsocketMessage,
config,
};
4 changes: 2 additions & 2 deletions src/lib/webcastWebsocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class WebcastWebsocket extends websocket.client {
});
}

#handleMessage(message) {
async #handleMessage(message) {
try {
let decodedContainer = deserializeWebsocketMessage(message.binaryData);
let decodedContainer = await deserializeWebsocketMessage(message.binaryData);

if (decodedContainer.id > 0) {
this.#sendAck(decodedContainer.id);
Expand Down

0 comments on commit c4c30b3

Please sign in to comment.