Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ publisherGroupKeys | {} | Object defining the group key as a hex string used to
publisherStoreKeyHistory | true | If `true`, the client will locally store every key used to encrypt messages at some point. If set to `false`, the client will not be able to answer subscribers asking for historical keys during resend requests.
subscriberGroupKeys | {} | Object defining, for each stream id, an object containing the group key used to decrypt for each publisher id. Not needed if `keyExchange` is defined.
keyExchange | {} | Defines RSA key pair to use for group key exchange. Can define `publicKey` and `privateKey` fields as strings in the PEM format, or stay empty to generate a key pair automatically. Can be set to `null` if no key exchange is required.
autoRevoke | true | If set to true, periodically rekeys streams when publishing if the number of subscribers to revoke reaches a threshold of 5. Can also manually trigger a rekey with `client.rekey(streamId)`.

### Authentication options

Expand Down
94 changes: 94 additions & 0 deletions src/KeyExchangeUtil.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import crypto from 'crypto'

import debugFactory from 'debug'

import EncryptionUtil from './EncryptionUtil'
import InvalidGroupKeyRequestError from './errors/InvalidGroupKeyRequestError'
import InvalidGroupKeyResponseError from './errors/InvalidGroupKeyResponseError'
import InvalidGroupKeyError from './errors/InvalidGroupKeyError'
import InvalidGroupKeyResetError from './errors/InvalidGroupKeyResetError'

const debug = debugFactory('KeyExchangeUtil')
const SUBSCRIBERS_EXPIRATION_TIME = 5 * 60 * 1000 // 5 minutes
const REVOCATION_DELAY = 10 * 60 * 1000 // 10 minutes
const REVOCATION_THRESHOLD = 5
export default class KeyExchangeUtil {
constructor(client) {
this._client = client
this.isSubscriberPromises = {}
this.localSubscribers = {}
this.lastCallToCheckRevocation = Number.NEGATIVE_INFINITY // Date in millis
this.publicKeysStore = {}
}

async handleGroupKeyRequest(streamMessage) {
Expand Down Expand Up @@ -50,6 +58,7 @@ export default class KeyExchangeUtil {
start: keyObj.start,
})
})
this.publicKeysStore[subscriberId] = parsedContent.publicKey
const response = await this._client.msgCreationUtil.createGroupKeyResponse(subscriberId, parsedContent.streamId, encryptedGroupKeys)
return this._client.publishStreamMessage(response)
}
Expand Down Expand Up @@ -93,6 +102,42 @@ export default class KeyExchangeUtil {
debug('INFO: Updated group key for stream "%s" and publisher "%s"', parsedContent.streamId, streamMessage.getPublisherId())
}

handleGroupKeyReset(streamMessage) {
// if it was signed, the StreamrClient already checked the signature. If not, StreamrClient accepted it since the stream
// does not require signed data for all types of messages.
if (!streamMessage.signature) {
throw new InvalidGroupKeyResetError('Received unsigned group key reset (it must be signed to avoid MitM attacks).')
}
// No need to check if parsedContent contains the necessary fields because it was already checked during deserialization
const parsedContent = streamMessage.getParsedContent()
// TODO: fix this hack in other PR
if (!this._client.subscribedStreamPartitions[parsedContent.streamId + '0']) {
throw new InvalidGroupKeyResetError('Received group key reset for a stream to which the client is not subscribed.')
}

if (!this._client.encryptionUtil) {
throw new InvalidGroupKeyResetError('Cannot decrypt group key reset without the private key.')
}
const groupKey = this._client.encryptionUtil.decryptWithPrivateKey(parsedContent.groupKey, true)
try {
EncryptionUtil.validateGroupKey(groupKey)
} catch (err) {
if (err instanceof InvalidGroupKeyError) {
throw new InvalidGroupKeyResetError(err.message)
} else {
throw err
}
}
const decryptedGroupKey = {
groupKey,
start: parsedContent.start
}
/* eslint-disable no-underscore-dangle */
this._client._setGroupKeys(parsedContent.streamId, streamMessage.getPublisherId(), [decryptedGroupKey])
/* eslint-enable no-underscore-dangle */
debug('INFO: Updated group key for stream "%s" and publisher "%s"', parsedContent.streamId, streamMessage.getPublisherId())
}

async getSubscribers(streamId) {
if (!this.subscribersPromise || (Date.now() - this.lastAccess) > SUBSCRIBERS_EXPIRATION_TIME) {
this.subscribersPromise = this._client.getStreamSubscribers(streamId).then((subscribers) => {
Expand All @@ -107,6 +152,53 @@ export default class KeyExchangeUtil {
return this.subscribersPromise
}

async nbSubscribersToRevoke(streamId) {
const realSubscribersSet = await this._client.getStreamSubscribers(streamId)
let counter = 0
const localSubscribersSet = this.localSubscribers[streamId] ? this.localSubscribers[streamId] : []
localSubscribersSet.forEach((subscriber) => {
if (!realSubscribersSet.includes(subscriber)) {
counter += 1
}
})
this.localSubscribers[streamId] = realSubscribersSet
return counter
}

async keyRevocationNeeded(streamId) {
const now = Date.now()
let res = false
if (this.lastCallToCheckRevocation + REVOCATION_DELAY < now) {
res = (await this.nbSubscribersToRevoke(streamId)) >= KeyExchangeUtil.REVOCATION_THRESHOLD
}
this.lastCallToCheckRevocation = now
return res
}

async rekey(streamId, getSubscribersLocally = true) {
const groupKey = crypto.randomBytes(32)
const start = Date.now()
const realSubscribersSet = getSubscribersLocally ? this.localSubscribers[streamId] : (await this._client.getStreamSubscribers(streamId))

const toRevoke = []
const promises = []
Object.keys(this.publicKeysStore).forEach(async (subscriberId) => { // iterating over local cache of Ethereum address --> RSA public key
if (realSubscribersSet.includes(subscriberId)) { // if still valid subscriber, send the new key
const encryptedGroupKey = {
groupKey: EncryptionUtil.encryptWithPublicKey(groupKey, this.publicKeysStore[subscriberId], true),
start,
}
const reset = await this._client.msgCreationUtil.createGroupKeyReset(subscriberId, streamId, encryptedGroupKey)
promises.push(this._client.publishStreamMessage(reset))
} else { // no longer a valid subscriber, to be removed from local cache
toRevoke.push(subscriberId)
}
})
toRevoke.forEach((revokedSubscriberId) => delete this.publicKeysStore[revokedSubscriberId])
await Promise.all(promises)
this._client.keyStorageUtil.addKey(streamId, groupKey, start)
}

async isSubscriber(streamId, subscriberId) {
if (!this.isSubscriberPromises[streamId]) {
this.isSubscriberPromises[streamId] = {}
Expand All @@ -129,3 +221,5 @@ export default class KeyExchangeUtil {
}
}
KeyExchangeUtil.SUBSCRIBERS_EXPIRATION_TIME = SUBSCRIBERS_EXPIRATION_TIME
KeyExchangeUtil.REVOCATION_DELAY = REVOCATION_DELAY
KeyExchangeUtil.REVOCATION_THRESHOLD = REVOCATION_THRESHOLD
17 changes: 17 additions & 0 deletions src/MessageCreationUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,23 @@ export default class MessageCreationUtil {
return streamMessage
}

async createGroupKeyReset(subscriberAddress, streamId, encryptedGroupKey) {
if (!this._signer) {
throw new Error('Cannot create unsigned group key reset. Must authenticate with "privateKey" or "provider"')
}
const publisherId = await this.getPublisherId()
const data = {
streamId,
groupKey: encryptedGroupKey.groupKey,
start: encryptedGroupKey.start,
}
const idAndPrevRef = this.createDefaultMsgIdAndPrevRef(subscriberAddress.toLowerCase(), publisherId)
const streamMessage = StreamMessage.create(idAndPrevRef[0], idAndPrevRef[1], StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE,
StreamMessage.ENCRYPTION_TYPES.RSA, data, StreamMessage.SIGNATURE_TYPES.NONE, null)
await this._signer.signStreamMessage(streamMessage)
return streamMessage
}

async createErrorMessage(destinationAddress, error) {
if (!this._signer) {
throw new Error('Cannot create unsigned error message. Must authenticate with "privateKey" or "provider"')
Expand Down
32 changes: 31 additions & 1 deletion src/StreamrClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import ResendUtil from './ResendUtil'
import InvalidGroupKeyResponseError from './errors/InvalidGroupKeyResponseError'
import InvalidContentTypeError from './errors/InvalidContentTypeError'
import InvalidGroupKeyRequestError from './errors/InvalidGroupKeyRequestError'
import InvalidGroupKeyResetError from './errors/InvalidGroupKeyResetError'

export default class StreamrClient extends EventEmitter {
constructor(options, connection) {
Expand All @@ -69,6 +70,7 @@ export default class StreamrClient extends EventEmitter {
publisherGroupKeys: {}, // {streamId: groupKey}
subscriberGroupKeys: {}, // {streamId: {publisherId: groupKey}}
keyExchange: {},
autoRevoke: true,
streamrNodeAddress: '0xf3E5A65851C3779f468c9EcB32E6f25D9D68601a',
streamrOperatorAddress: '0xc0aa4dC0763550161a6B59fa430361b5a26df28C',
tokenAddress: '0x0Cf0Ee63788A0849fE5297F3407f701E122cC023',
Expand Down Expand Up @@ -320,7 +322,22 @@ export default class StreamrClient extends EventEmitter {
this.keyExchangeUtil.handleGroupKeyResponse(streamMessage)
} else {
throw new InvalidGroupKeyResponseError(
`Received group key from an invalid publisher ${streamMessage.getPublisherId()} for stream ${streamId}`
`Received group key response from an invalid publisher ${streamMessage.getPublisherId()} for stream ${streamId}`
)
}
}
} else if (streamMessage.contentType === StreamMessage.CONTENT_TYPES.GROUP_KEY_RESET_SIMPLE) {
if (this.keyExchangeUtil) {
const { streamId } = streamMessage.getParsedContent()
// A valid publisher of the client's inbox stream could send key resets for other streams to which
// the publisher doesn't have write permissions. Thus the following additional check is necessary.
// TODO: fix this hack in other PR (and move logic to keyExchangeUtil)
const valid = await this.subscribedStreamPartitions[streamId + '0'].isValidPublisher(streamMessage.getPublisherId())
if (valid) {
this.keyExchangeUtil.handleGroupKeyReset(streamMessage)
} else {
throw new InvalidGroupKeyResetError(
`Received group key reset from an invalid publisher ${streamMessage.getPublisherId()} for stream ${streamId}`
)
}
}
Expand Down Expand Up @@ -426,6 +443,12 @@ export default class StreamrClient extends EventEmitter {

if (this.isConnected()) {
// If connected, emit a publish request
if (this.options.autoRevoke && this.keyExchangeUtil) {
const res = await this.keyExchangeUtil.keyRevocationNeeded(streamId)
if (res) {
await this.keyExchangeUtil.rekey(streamId)
}
}
return this._requestPublish(streamMessage, sessionToken)
}

Expand Down Expand Up @@ -464,6 +487,13 @@ export default class StreamrClient extends EventEmitter {
)
}

async rekey(streamId) {
if (!this.keyExchangeUtil) {
throw new Error('options.keyExchange must be defined in the StreamrClient constructor to use this function.')
}
return this.keyExchangeUtil.rekey(streamId, false)
}

async resend(optionsOrStreamId, callback) {
const options = this._validateParameters(optionsOrStreamId, callback)

Expand Down
8 changes: 8 additions & 0 deletions src/errors/InvalidGroupKeyResetError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export default class InvalidGroupKeyResetError extends Error {
constructor(message) {
super(message)
if (Error.captureStackTrace) {
Error.captureStackTrace(this, this.constructor)
}
}
}
52 changes: 49 additions & 3 deletions test/integration/StreamrClient.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1006,15 +1006,61 @@ describe('StreamrClient', () => {
})
}, 2 * TIMEOUT)

it('client.subscribe with resend last can get the historical keys for previous encrypted messages', (done) => {
it('client.subscribe can get the new group key after a group key reset', async (done) => {
client.once('error', done)
const groupKey = crypto.randomBytes(32)
// subscribe without knowing the group key to decrypt stream messages
let receivedFirst = false
const sub = client.subscribe({
stream: stream.id,
}, (parsedContent, streamMessage) => {
// Check signature stuff
assert.strictEqual(streamMessage.signatureType, StreamMessage.SIGNATURE_TYPES.ETH)
assert(streamMessage.getPublisherId())
assert(streamMessage.signature)

if (!receivedFirst) {
assert.strictEqual(parsedContent.data, 'msg1')
// Now the subscriber knows the initial group key
assert.deepStrictEqual(sub.groupKeys[streamMessage.getPublisherId().toLowerCase()], groupKey)
receivedFirst = true
} else {
assert.strictEqual(parsedContent.data, 'msg2')
// Now the subscriber knows another group key after reset
assert.notDeepStrictEqual(sub.groupKeys[streamMessage.getPublisherId().toLowerCase()], groupKey)

// All good, unsubscribe
client.unsubscribe(sub)
sub.on('unsubscribed', () => {
done()
})
}
})

// Publish after subscribed
sub.on('subscribed', async () => {
await client.publish(stream.id, {
data: 'msg1',
}, Date.now(), null, groupKey)
setTimeout(async () => {
await client.rekey(stream.id)
// publishing second message with the newly generated key
await client.publish(stream.id, {
data: 'msg2'
})
}, 2000)
})
}, 2 * TIMEOUT)

it('client.subscribe with resend last can get the historical keys for previous encrypted messages', async (done) => {
client.once('error', done)
// Publish encrypted messages with different keys
const groupKey1 = crypto.randomBytes(32)
const groupKey2 = crypto.randomBytes(32)
client.publish(stream.id, {
await client.publish(stream.id, {
test: 'resent msg 1',
}, Date.now(), null, groupKey1)
client.publish(stream.id, {
await client.publish(stream.id, {
test: 'resent msg 2',
}, Date.now(), null, groupKey2)

Expand Down
Loading