Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve some flaky tests #1801

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 80 additions & 40 deletions test/abstract_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3106,35 +3106,53 @@ export default function abstractTest(server, config, ports) {
timeout: 4000,
},
function _test(t, done) {
const client = connect({ reconnectPeriod: 200 })
t.after(() => {
// close client if not closed
if (client) {
client.end(true)
}
})
let client = connect({ reconnectPeriod: 100 })
let serverPublished = false
let clientCalledBack = false

// client is connected the first time
server.once('client', (serverClient) => {
serverClient.on('connect', () => {
// destroy the stream before the publish is acknowledged
serverClient.once('connect', () => {
setImmediate(() => {
serverClient.stream.destroy()
})
})

// after 100ms the client should reconnect
server.once('client', (serverClientNew) => {
serverClientNew.on('publish', () => {
serverPublished = true
check()
})
})
})

// ensure that on first reconnect the publish is still not acknowledged
client.once('reconnect', () => {
// client callback should not be triggered on first connection
assert.isFalse(clientCalledBack)
})

client.publish('hello', 'world', { qos: 1 }, () => {
clientCalledBack = true
check()
})

function check() {
if (serverPublished && clientCalledBack) {
client.end(true, done)
client.on('packetreceive', (packet) => {
if (packet.cmd === 'puback') {
assert.isTrue(serverPublished)
setImmediate(() => {
assert.isTrue(clientCalledBack)
client.end(true, done)
client = null
})
}
}
})
},
)

Expand All @@ -3143,7 +3161,7 @@ export default function abstractTest(server, config, ports) {
let serverPublished = false
let clientCalledBack = false
server.once('client', (serverClient) => {
serverClient.on('connect', () => {
serverClient.once('connect', () => {
setImmediate(() => {
serverClient.stream.destroy()
client.end(true, (err) => {
Expand All @@ -3164,42 +3182,65 @@ export default function abstractTest(server, config, ports) {
})
})

it('should resend in-flight QoS 2 publish messages from the client', function _test(t, done) {
const client = connect({ reconnectPeriod: 200 })
let serverPublished = false
let clientCalledBack = false
it(
'should resend in-flight QoS 2 publish messages from the client',
{
timeout: 4000,
},
function _test(t, done) {
t.after(() => {
// close client if not closed
if (client) {
client.end(true)
}
})

server.once('client', (serverClient) => {
// ignore errors
serverClient.on('error', () => {})
serverClient.on('publish', () => {
setImmediate(() => {
serverClient.stream.destroy()
let client = connect({ reconnectPeriod: 100 })
let serverPublished = false
let clientCalledBack = false

server.once('client', (serverClient) => {
// ignore errors
serverClient.on('error', () => {})
serverClient.on('publish', () => {
setImmediate(() => {
serverClient.stream.destroy()
})
})
})

server.once('client', (serverClientNew) => {
serverClientNew.on('pubrel', () => {
serverPublished = true
check()
server.once('client', (serverClientNew) => {
serverClientNew.on('pubrel', () => {
serverPublished = true
})
})
})
})

client.publish('hello', 'world', { qos: 2 }, () => {
clientCalledBack = true
check()
})
client.publish('hello', 'world', { qos: 2 }, () => {
clientCalledBack = true
})

function check() {
if (serverPublished && clientCalledBack) {
client.end(true, done)
}
}
})
client.on('packetreceive', (packet) => {
if (packet.cmd === 'pubcomp') {
assert.isTrue(serverPublished)
setImmediate(() => {
assert.isTrue(clientCalledBack)
client.end(true, done)
client = null
})
}
})
},
)

it('should not resend in-flight QoS 1 removed publish messages from the client', function _test(t, done) {
const client = connect({ reconnectPeriod: 200 })
t.after(() => {
// close client if not closed
if (client) {
client.end(true)
}
})

let client = connect({ reconnectPeriod: 100 })
let clientCalledBack = false

server.once('client', (serverClient) => {
Expand All @@ -3211,8 +3252,7 @@ export default function abstractTest(server, config, ports) {

server.once('client', (serverClientNew) => {
serverClientNew.on('publish', () => {
fail()
done()
done(Error('should not have received publish'))
})
})
})
Expand All @@ -3234,6 +3274,7 @@ export default function abstractTest(server, config, ports) {
assert.isTrue(clientCalledBack)
client.end(true, (err) => {
done(err)
client = null
})
})

Expand All @@ -3250,8 +3291,7 @@ export default function abstractTest(server, config, ports) {

server.once('client', (serverClientNew) => {
serverClientNew.on('publish', () => {
fail()
done()
done(Error('should not have received publish'))
})
})
})
Expand Down