Skip to content

Commit

Permalink
fix: publish/subscribe/unsubscribe types and missing types exports (#…
Browse files Browse the repository at this point in the history
…1688)

Co-authored-by: Daniel González García <daniel.gonzalez@danfoss.com>
  • Loading branch information
dgg and Daniel González García committed Sep 8, 2023
1 parent 316d9a2 commit 2df6af7
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 51 deletions.
32 changes: 19 additions & 13 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ export type ISubscriptionMap = {
resubscribe?: boolean
}

export { IConnackPacket, IDisconnectPacket, IPublishPacket, Packet }
export type OnConnectCallback = (packet: IConnackPacket) => void
export type OnDisconnectCallback = (packet: IDisconnectPacket) => void
export type ClientSubscribeCallback = (
Expand Down Expand Up @@ -879,19 +880,19 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
public publish(
topic: string,
message: string | Buffer,
callback?: DoneCallback,
callback?: PacketCallback,
): MqttClient
public publish(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
callback?: DoneCallback,
callback?: PacketCallback,
): MqttClient
public publish(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions | DoneCallback,
callback?: DoneCallback,
callback?: PacketCallback,
): MqttClient {
this.log('publish :: message `%s` to topic `%s`', message, topic)
const { options } = this
Expand Down Expand Up @@ -976,23 +977,26 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public publishAsync(topic: string, message: string | Buffer): Promise<void>
public publishAsync(
topic: string,
message: string | Buffer,
): Promise<Packet | undefined>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void>
): Promise<Packet | undefined>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void> {
): Promise<Packet | undefined> {
return new Promise((resolve, reject) => {
this.publish(topic, message, opts, (err) => {
this.publish(topic, message, opts, (err, packet) => {
if (err) {
reject(err)
} else {
resolve()
resolve(packet)
}
})
})
Expand Down Expand Up @@ -1346,21 +1350,23 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public unsubscribeAsync(topic: string | string[]): Promise<void>
public unsubscribeAsync(
topic: string | string[],
): Promise<Packet | undefined>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void>
): Promise<Packet | undefined>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void> {
): Promise<Packet | undefined> {
return new Promise((resolve, reject) => {
this.unsubscribe(topic, opts, (err) => {
this.unsubscribe(topic, opts, (err, packet) => {
if (err) {
reject(err)
} else {
resolve()
resolve(packet)
}
})
})
Expand Down
90 changes: 56 additions & 34 deletions test/abstract_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ export default function abstractTest(server, config) {

it('should emit connect', function _test(done) {
const client = connect()
client.once('connect', () => {
client.once('connect', (packet: mqtt.IConnackPacket) => {
assert.equal(packet.cmd, 'connack')
client.end(true, (err) => done(err))
})
client.once('error', done)
Expand Down Expand Up @@ -1078,7 +1079,9 @@ export default function abstractTest(server, config) {
const client = connect()

client.once('connect', () => {
client.publish('a', 'b', () => {
// callback args can be typed
client.publish('a', 'b', (_, packet?: mqtt.Packet) => {
assert.isUndefined(packet)
client.end((err) => done(err))
})
})
Expand All @@ -1089,7 +1092,8 @@ export default function abstractTest(server, config) {
const opts: IClientPublishOptions = { qos: 1 }

client.once('connect', () => {
client.publish('a', 'b', opts, () => {
client.publish('a', 'b', opts, (_, packet?: mqtt.Packet) => {
assert.exists(packet)
client.end((err) => done(err))
})
})
Expand Down Expand Up @@ -1131,18 +1135,24 @@ export default function abstractTest(server, config) {
})

client.once('connect', () => {
client.publish('a', 'b', pubOpts, (err) => {
if (version === 5) {
assert.strictEqual(err.code, pubackReasonCode)
} else {
assert.ifError(err)
}
setImmediate(() => {
client.end(() => {
server2.close(done)
client.publish(
'a',
'b',
pubOpts,
(err, packet?: mqtt.Packet) => {
assert.exists(packet)
if (version === 5) {
assert.strictEqual(err.code, pubackReasonCode)
} else {
assert.ifError(err)
}
setImmediate(() => {
client.end(() => {
server2.close(done)
})
})
})
})
},
)
})
})
})
Expand All @@ -1152,7 +1162,8 @@ export default function abstractTest(server, config) {
const opts: IClientPublishOptions = { qos: 2 }

client.once('connect', () => {
client.publish('a', 'b', opts, () => {
client.publish('a', 'b', opts, (_, packet?: mqtt.Packet) => {
assert.exists(packet)
client.end((err) => done(err))
})
})
Expand Down Expand Up @@ -1198,18 +1209,24 @@ export default function abstractTest(server, config) {
})

client.once('connect', () => {
client.publish('a', 'b', pubOpts, (err) => {
if (version === 5) {
assert.strictEqual(err.code, pubrecReasonCode)
} else {
assert.ifError(err)
}
setImmediate(() => {
client.end(true, () => {
server2.close(done)
client.publish(
'a',
'b',
pubOpts,
(err, packet?: mqtt.Packet) => {
assert.exists(packet)
if (version === 5) {
assert.strictEqual(err.code, pubrecReasonCode)
} else {
assert.ifError(err)
}
setImmediate(() => {
client.end(true, () => {
server2.close(done)
})
})
})
})
},
)
})
})
})
Expand Down Expand Up @@ -1907,7 +1924,9 @@ export default function abstractTest(server, config) {
const topic = 'topic'

client.once('connect', () => {
client.unsubscribe(topic, () => {
// callback args can be typed
client.unsubscribe(topic, (_, packet?: mqtt.Packet) => {
assert.isUndefined(packet)
client.end(true, done)
})
})
Expand Down Expand Up @@ -2372,12 +2391,15 @@ export default function abstractTest(server, config) {

//
client.subscribe(testPacket.topic)
client.once('message', (topic, message, packet) => {
assert.strictEqual(topic, testPacket.topic)
assert.strictEqual(message.toString(), testPacket.payload)
assert.strictEqual(packet.cmd, 'publish')
client.end(true, done)
})
client.once(
'message',
(topic, message, packet: mqtt.IPublishPacket) => {
assert.strictEqual(topic, testPacket.topic)
assert.strictEqual(message.toString(), testPacket.payload)
assert.strictEqual(packet.cmd, 'publish')
client.end(true, done)
},
)

server.once('client', (serverClient) => {
serverClient.on('subscribe', () => {
Expand All @@ -2397,7 +2419,7 @@ export default function abstractTest(server, config) {
}

client.subscribe(testPacket.topic)
client.on('packetreceive', (packet) => {
client.on('packetreceive', (packet: mqtt.Packet) => {
if (packet.cmd === 'publish') {
assert.strictEqual(packet.qos, 1)
assert.strictEqual(packet.topic, testPacket.topic)
Expand Down
11 changes: 7 additions & 4 deletions test/client_mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1048,10 +1048,13 @@ describe('MQTT 5.0', () => {
}

const client = mqtt.connect(opts)
client.once('disconnect', (disconnectPacket) => {
assert.strictEqual(disconnectPacket.reasonCode, 128)
client.end(true, (err) => done(err))
})
client.once(
'disconnect',
(disconnectPacket: mqtt.IDisconnectPacket) => {
assert.strictEqual(disconnectPacket.reasonCode, 128)
client.end(true, (err) => done(err))
},
)
})

it('pubrec handling custom reason code', function test(done) {
Expand Down

0 comments on commit 2df6af7

Please sign in to comment.