Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { BigNumber } from '@ethersproject/bignumber'
import { getAddress } from '@ethersproject/address'
import { Contract } from '@ethersproject/contracts'
import { StreamPartDefinition } from './stream'
import type { GroupKey } from './stream/Encryption'

// TODO get metadata type from streamr-protocol-js project (it doesn't export the type definitions yet)
export type OnMessageCallback = MaybeAsync<(message: any, metadata: any) => void>
Expand Down Expand Up @@ -160,7 +161,7 @@ function Plugin(targetInstance: any, srcInstance: any) {
}

// these are mixed in via Plugin function above
export interface StreamrClient extends StreamEndpoints, LoginEndpoints {}
export interface StreamrClient extends StreamEndpoints, LoginEndpoints, ReturnType<typeof Publisher>, Subscriber {}

/**
* @category Important
Expand Down Expand Up @@ -340,12 +341,16 @@ export class StreamrClient extends EventEmitter { // eslint-disable-line no-rede
return getUserId(this)
}

setNextGroupKey(...args: Todo) {
return this.publisher.setNextGroupKey(...args)
setNextGroupKey(streamId: string, newKey: GroupKey) {
return this.publisher.setNextGroupKey(streamId, newKey)
}

rotateGroupKey(...args: Todo) {
return this.publisher.rotateGroupKey(...args)
rotateGroupKey(streamId: string) {
return this.publisher.rotateGroupKey(streamId)
}

rekey(streamId: string) {
return this.publisher.rekey(streamId)
}

/**
Expand Down
8 changes: 6 additions & 2 deletions src/publish/Encrypt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ export default function Encrypt(client: StreamrClient) {
if (streamMessage.messageType !== StreamMessage.MESSAGE_TYPES.MESSAGE) {
return
}
const groupKey = await getPublisherKeyExchange().useGroupKey(stream.id)
await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey)

const [groupKey, nextGroupKey] = await getPublisherKeyExchange().useGroupKey(stream.id)
await EncryptionUtil.encryptStreamMessage(streamMessage, groupKey, nextGroupKey)
}

return Object.assign(encrypt, {
Expand All @@ -50,6 +51,9 @@ export default function Encrypt(client: StreamrClient) {
rotateGroupKey(...args: Parameters<PublisherKeyExhangeAPI['rotateGroupKey']>) {
return getPublisherKeyExchange().rotateGroupKey(...args)
},
rekey(...args: Parameters<PublisherKeyExhangeAPI['rekey']>) {
return getPublisherKeyExchange().rekey(...args)
},
start() {
return getPublisherKeyExchange().start()
},
Expand Down
6 changes: 6 additions & 0 deletions src/publish/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ function getCreateStreamMessage(client) {
rotateGroupKey(maybeStreamId) {
return encrypt.rotateGroupKey(maybeStreamId)
},
rekey(maybeStreamId) {
return encrypt.rekey(maybeStreamId)
},
startKeyExchange() {
return encrypt.start()
},
Expand Down Expand Up @@ -315,6 +318,9 @@ export default function Publisher(client) {
},
setNextGroupKey(streamId, newKey) {
return createStreamMessage.setNextGroupKey(streamId, newKey)
},
rekey(streamId) {
return createStreamMessage.rekey(streamId)
}
}
}
91 changes: 67 additions & 24 deletions src/stream/KeyExchange.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ function GroupKeyStore({ groupKeys = new Map() }) {
})

let currentGroupKeyId // current key id if any
let nextGroupKey // key to use next, disappears if not actually used.
const nextGroupKeys = [] // the keys to use next, disappears if not actually used. Max queue size 2

store.forEach((groupKey) => {
GroupKey.validate(GroupKey.from(groupKey))
Expand All @@ -79,7 +79,8 @@ function GroupKeyStore({ groupKeys = new Map() }) {
const existingKey = GroupKey.from(store.get(groupKey.id))
if (!existingKey.equals(groupKey)) {
throw new GroupKey.InvalidGroupKeyError(
`Trying to add groupKey ${groupKey.id} but key exists & is not equivalent to new GroupKey: ${groupKey}.`
`Trying to add groupKey ${groupKey.id} but key exists & is not equivalent to new GroupKey: ${groupKey}.`,
groupKey
)
}

Expand All @@ -96,29 +97,49 @@ function GroupKeyStore({ groupKeys = new Map() }) {
has(groupKeyId) {
if (currentGroupKeyId === groupKeyId) { return true }

if (nextGroupKey && nextGroupKey.id === groupKeyId) { return true }
if (nextGroupKeys.some((nextKey) => nextKey.id === groupKeyId)) { return true }

return store.has(groupKeyId)
},
isEmpty() {
return !nextGroupKey && store.size === 0
return nextGroupKeys.length === 0 && store.size === 0
},
useGroupKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method could be plural as it generates an array of GroupKeys?

Copy link
Contributor Author

@timoxley timoxley Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, sort of, it could probably do with some docs though.

Basically it's giving you the current + (optional) next group key.

If there's two values, it'll encrypt the message with the first, but embed the second in the stream message (encrypted with the first), then the next time you call useGroupKey you will get that second value in the first position, and an empty value second position, until a new group key is assigned. When a new group key is assigned, it'll use the existing key one more time, embedding the next key, then start using the new key from there on.

Something like this:

setGroupKey(keyA)
useGroupKey() // => [keyA, undefined]
useGroupKey() // => [keyA, undefined]
setGroupKey(keyB)
useGroupKey() // => [keyA, keyB]
useGroupKey() // => [keyB, undefined]
useGroupKey() // => [keyB, undefined]
setGroupKey(keyC)
useGroupKey() // => [keyB, keyC]
useGroupKey() // => [keyC, undefined]
// etc

Where useGroupKey is called before encrypting each message.
Iff there's a second value, it will be set as the streamMessage.newGroupKey

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On possibility would be to introduce some helper data type, like GroupKeyQueue. It would encapsulate this e.g. max length handling.

if (nextGroupKey) {
// next key becomes current key
storeKey(nextGroupKey)

currentGroupKeyId = nextGroupKey.id
nextGroupKey = undefined
}

if (!currentGroupKeyId) {
// generate & use key if none already set
this.rotateGroupKey()
return this.useGroupKey()
const nextGroupKey = nextGroupKeys.pop()
switch (true) {
// First use of group key on this stream, no current key. Make next key current.
case !!(!currentGroupKeyId && nextGroupKey): {
storeKey(nextGroupKey)
currentGroupKeyId = nextGroupKey.id
return [
this.get(currentGroupKeyId),
undefined,
]
}
// Keep using current key (empty next)
case !!(currentGroupKeyId && !nextGroupKey): {
return [
this.get(currentGroupKeyId),
undefined
]
}
// Key changed (non-empty next). return current + next. Make next key current.
case !!(currentGroupKeyId && nextGroupKey): {
storeKey(nextGroupKey)
const prevGroupKey = this.get(currentGroupKeyId)
currentGroupKeyId = nextGroupKey.id
// use current key one more time
return [
prevGroupKey,
nextGroupKey,
]
}
// Generate & use new key if none already set.
default: {
this.rotateGroupKey()
return this.useGroupKey()
}
}

return this.get(currentGroupKeyId)
},
get(groupKeyId) {
const groupKey = store.get(groupKeyId)
Expand All @@ -127,7 +148,7 @@ function GroupKeyStore({ groupKeys = new Map() }) {
},
clear() {
currentGroupKeyId = undefined
nextGroupKey = undefined
nextGroupKeys.length = 0
return store.clear()
},
rotateGroupKey() {
Expand All @@ -138,7 +159,14 @@ function GroupKeyStore({ groupKeys = new Map() }) {
},
setNextGroupKey(newKey) {
GroupKey.validate(newKey)
nextGroupKey = newKey
nextGroupKeys.unshift(newKey)
nextGroupKeys.length = Math.min(nextGroupKeys.length, 2)
},
rekey() {
const newKey = GroupKey.generate()
storeKey(newKey)
currentGroupKeyId = newKey.id
nextGroupKeys.length = 0
}
}
}
Expand Down Expand Up @@ -215,7 +243,8 @@ async function PublisherKeyExhangeSubscription(client, getGroupKeyStore) {
const subscriberId = streamMessage.getPublisherId()

const groupKeyStore = getGroupKeyStore(streamId)
const encryptedGroupKeys = groupKeyIds.map((id) => {
const isSubscriber = await client.isStreamSubscriber(streamId, subscriberId)
const encryptedGroupKeys = !isSubscriber ? [] : groupKeyIds.map((id) => {
const groupKey = groupKeyStore.get(id)
if (!groupKey) {
return null // will be filtered out
Expand Down Expand Up @@ -316,9 +345,17 @@ export function PublisherKeyExhange(client, { groupKeys = {} } = {}) {
return !groupKeyStore.isEmpty()
}

async function rekey(streamId) {
if (!enabled) { return }
const groupKeyStore = getGroupKeyStore(streamId)
groupKeyStore.rekey()
await next()
}

return {
setNextGroupKey,
useGroupKey,
rekey,
rotateGroupKey,
hasAnyGroupKey,
async start() {
Expand All @@ -341,7 +378,7 @@ async function getGroupKeysFromStreamMessage(streamMessage, encryptionUtil) {

async function SubscriberKeyExhangeSubscription(client, getGroupKeyStore, encryptionUtil) {
let sub
async function onKeyExchangeMessage(parsedContent, streamMessage) {
async function onKeyExchangeMessage(_parsedContent, streamMessage) {
try {
const { messageType } = streamMessage
const { MESSAGE_TYPES } = StreamMessage
Expand Down Expand Up @@ -537,9 +574,9 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) {
})

async function getGroupKey(streamMessage) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getGroupKey definitely has a confusing name now though, since it returns an array of keys, one of which should be the one you're after, or an empty array.

if (!streamMessage.groupKeyId) { return undefined }
if (!streamMessage.groupKeyId) { return [] }
await next()
if (!enabled) { return undefined }
if (!enabled) { return [] }

return getKey(streamMessage)
}
Expand All @@ -549,6 +586,12 @@ export function SubscriberKeyExchange(client, { groupKeys = {} } = {}) {
enabled = true
return next()
},
addNewKey(streamMessage) {
if (!streamMessage.newGroupKey) { return }
const streamId = streamMessage.getStreamId()
const groupKeyStore = getGroupKeyStore(streamId)
groupKeyStore.add(streamMessage.newGroupKey)
},
async stop() {
enabled = false
return next()
Expand Down
1 change: 1 addition & 0 deletions src/subscribe/Decrypt.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export default function Decrypt(client, options = {}) {
throw new UnableToDecryptError(`Group key not found: ${streamMessage.groupKeyId}`, streamMessage)
}
await EncryptionUtil.decryptStreamMessage(streamMessage, groupKey)
requestKey.addNewKey(streamMessage)
} catch (err) {
await onError(err, streamMessage)
} finally {
Expand Down
Loading