Skip to content

Commit

Permalink
Changed timing of messages on mongofw/wr replication.
Browse files Browse the repository at this point in the history
  • Loading branch information
riclolsen committed May 26, 2024
1 parent 8577a81 commit 0db4331
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
10 changes: 6 additions & 4 deletions src/mongofw/customized_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const { Double } = require('mongodb')
const { setInterval } = require('timers')
const dgram = require('dgram')
const Queue = require('queue-fifo')
const zlib = require('fast-zlib')
// const zlib = require('fast-zlib')
const zlib = require('zlib')

// UDP broadcast options
const udpPort = 12345
Expand Down Expand Up @@ -66,7 +67,7 @@ module.exports.CustomProcessor = function (
const changeStreamUserActions = db
.collection(RealtimeDataCollectionName)
.watch(
{ $match: { operationType: 'update' } },
[{ $match: { operationType: 'update' } }],
{ fullDocument: 'updateLookup' }
)

Expand Down Expand Up @@ -146,8 +147,9 @@ async function procQueue() {
if (strSz > 7000) break
}
const opData = JSON.stringify(fwArr)
const deflate = new zlib.Deflate()
const message = deflate.process(opData)
// const deflate = new zlib.Deflate()
// const message = deflate.process(opData)
const message = zlib.deflateSync(opData)

Log.log(opData.length + ' ' + message.length)
Log.log('Objects: ' + fwArr.length)
Expand Down
38 changes: 23 additions & 15 deletions src/mongowr/customized_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const { Double } = require('mongodb')
const { setInterval } = require('timers')
const dgram = require('node:dgram')
const Queue = require('queue-fifo')
const zlib = require('fast-zlib')
// const zlib = require('fast-zlib')
const zlib = require('zlib')

// UDP broadcast options
const udpPort = 12345
Expand Down Expand Up @@ -78,7 +79,7 @@ module.exports.CustomProcessor = async function (
server.bind(udpPort, udpBind)
}

let cnt = -1
let cntChg = -1
let cntLost = 0

setTimeout(procQueue, 1000)
Expand All @@ -99,27 +100,24 @@ async function procQueue() {
const msgRaw = msgQueue.peek()
msgQueue.dequeue()

const inflate = new zlib.Inflate()
const buffer = inflate.process(msgRaw)
//const inflate = new zlib.Inflate()
//const buffer = inflate.process(msgRaw)
const buffer = zlib.inflateSync(msgRaw)
const msg = buffer.toString('utf8')
const arrObj = JSON.parse(msg)

if (arrObj.length)
for (let i = 0; i < arrObj.length; i++) {
const dataObj = arrObj[i]
Log.log('Queue Size: ' + msgQueue.size())


if (!dataObj?.cnt) {
Log.log('Unexpected format')
}
if (dataObj.cnt - cnt > 1 && cnt != -1) {
if (dataObj.cnt - cntChg > 1 && cntChg != -1) {
Log.log('Message lost # ' + (dataObj.cnt - 1))
cntLost += dataObj.cnt - cnt
cntLost += dataObj.cnt - cntChg
}
cnt = dataObj.cnt
Log.log('Total lost: ' + cntLost)
Log.log(' Chg count: ' + dataObj.cnt)
Log.log(' Pkt count: ' + pktCnt)
cntChg = dataObj.cnt

// will process only update data from drivers
if (!dataObj?.updateDescription?.updatedFields?.sourceDataUpdate)
Expand All @@ -143,19 +141,29 @@ async function procQueue() {

updateOps.push({
updateOne: {
filter: { ...dataObj.documentKey },
filter: { tag: dataObj.tag },
update: { $set: { ...dataObj.updateDescription.updatedFields } },
},
})
}
} catch (e) {
Log.log('Error: ' + e)
}
await sleep(1)
}

if (updateOps.length > 0) {
const result = await collection.bulkWrite(updateOps)
Log.log(JSON.stringify(result))
}
setTimeout(procQueue, 150)
Log.log('Queue Size: ' + msgQueue.size())
Log.log('Total lost: ' + cntLost)
Log.log(' Chg count: ' + cntChg)
Log.log(' Pkt count: ' + pktCnt)
}

setTimeout(procQueue, 100)
}

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}

0 comments on commit 0db4331

Please sign in to comment.