Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ Connection
ttl TTL
Subscription
id *String
topic **String
ttl TTL

@indexes
Expand Down Expand Up @@ -209,8 +208,6 @@ resources:
KeySchema:
- AttributeName: id
KeyType: HASH
- AttributeName: topic
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: ConnectionIndex
KeySchema:
Expand Down Expand Up @@ -259,7 +256,6 @@ resource "aws_dynamodb_table" "subscriptions-table" {
read_capacity = 1
write_capacity = 1
hash_key = "id"
range_key = "topic"

attribute {
name = "id"
Expand Down
27 changes: 3 additions & 24 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ graphql-lambda-subscriptions
- [LoggerFunction](README.md#loggerfunction)
- [MaybePromise](README.md#maybepromise)
- [SubscribeArgs](README.md#subscribeargs)
- [SubscriptionDefinition](README.md#subscriptiondefinition)
- [SubscriptionFilter](README.md#subscriptionfilter)
- [WebSocketResponse](README.md#websocketresponse)

Expand All @@ -34,11 +33,11 @@ graphql-lambda-subscriptions

### LoggerFunction

Ƭ **LoggerFunction**: (`message`: `string`, `obj?`: `any`) => `void`
Ƭ **LoggerFunction**: (`message`: `string`, `obj`: `Record`<`string`, `any`\>) => `void`

#### Type declaration

▸ (`message`, `obj?`): `void`
▸ (`message`, `obj`): `void`

Log operational events with a logger of your choice. It will get a message and usually object with relevant data

Expand All @@ -47,7 +46,7 @@ Log operational events with a logger of your choice. It will get a message and u
| Name | Type |
| :------ | :------ |
| `message` | `string` |
| `obj?` | `any` |
| `obj` | `Record`<`string`, `any`\> |

##### Returns

Expand Down Expand Up @@ -81,26 +80,6 @@ ___

___

### SubscriptionDefinition

Ƭ **SubscriptionDefinition**<`T`, `TSubscribeArgs`\>: `Object`

#### Type parameters

| Name | Type |
| :------ | :------ |
| `T` | extends [`PubSubEvent`](interfaces/PubSubEvent.md) |
| `TSubscribeArgs` | extends [`SubscribeArgs`](README.md#subscribeargs)[`SubscribeArgs`](README.md#subscribeargs) |

#### Type declaration

| Name | Type |
| :------ | :------ |
| `filter?` | [`SubscriptionFilter`](README.md#subscriptionfilter)<`TSubscribeArgs`, `T`[``"payload"``]\> |
| `topic` | `string` |

___

### SubscriptionFilter

Ƭ **SubscriptionFilter**<`TSubscribeArgs`, `TReturn`\>: `Partial`<`TReturn`\> \| `void` \| (...`args`: `TSubscribeArgs`) => [`MaybePromise`](README.md#maybepromise)<`Partial`<`TReturn`\> \| `void`\>
Expand Down
4 changes: 2 additions & 2 deletions docs/interfaces/ServerArgs.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ ___

### onConnectionInit

▸ `Optional` **onConnectionInit**(`e`): [`MaybePromise`](../README.md#maybepromise)<`object`\>
▸ `Optional` **onConnectionInit**(`e`): [`MaybePromise`](../README.md#maybepromise)<`Record`<`string`, `any`\>\>

#### Parameters

Expand All @@ -120,7 +120,7 @@ ___

#### Returns

[`MaybePromise`](../README.md#maybepromise)<`object`\>
[`MaybePromise`](../README.md#maybepromise)<`Record`<`string`, `any`\>\>

___

Expand Down
13 changes: 10 additions & 3 deletions docs/interfaces/SubscribePseudoIterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@

### Properties

- [topicDefinitions](SubscribePseudoIterable.md#topicdefinitions)
- [filter](SubscribePseudoIterable.md#filter)
- [topic](SubscribePseudoIterable.md#topic)

### Methods

Expand All @@ -39,9 +40,15 @@

## Properties

### topicDefinitions
### filter

• **topicDefinitions**: [`SubscriptionDefinition`](../README.md#subscriptiondefinition)<`T`, `TSubscribeArgs`\>[]
• `Optional` **filter**: [`SubscriptionFilter`](../README.md#subscriptionfilter)<`TSubscribeArgs`, `T`[``"payload"``]\>

___

### topic

• **topic**: `string`

## Methods

Expand Down
8 changes: 4 additions & 4 deletions lib/makeServerClosure.ts → lib/buildServerClosure.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { ServerArgs, ServerClosure, Connection, Subscription } from './types'
import { ServerArgs, ServerClosure } from './types'
import { DDB } from './ddb/DDB'
import { log as debugLogger } from './utils/logger'

export const makeServerClosure = async (opts: ServerArgs): Promise<ServerClosure> => {
export const buildServerClosure = async (opts: ServerArgs): Promise<ServerClosure> => {
const {
tableNames,
log = debugLogger,
Expand All @@ -19,8 +19,8 @@ export const makeServerClosure = async (opts: ServerArgs): Promise<ServerClosure
dynamodb: dynamodb,
log,
models: {
subscription: DDB<Subscription>({ dynamodb, tableName: (await tableNames)?.subscriptions || 'graphql_subscriptions', log }),
connection: DDB<Connection>({ dynamodb, tableName: (await tableNames)?.connections || 'graphql_connections', log }),
subscription: DDB({ dynamodb, tableName: (await tableNames)?.subscriptions || 'graphql_subscriptions', log }),
connection: DDB({ dynamodb, tableName: (await tableNames)?.connections || 'graphql_connections', log }),
},
}
}
146 changes: 89 additions & 57 deletions lib/ddb/DDB.ts
Original file line number Diff line number Diff line change
@@ -1,93 +1,125 @@
import { DynamoDB } from 'aws-sdk'
import { LoggerFunction, DDBType } from '../types'

export interface DDBClient<T extends DDBType> {
get: (id: string) => Promise<T|null>
put: (Item: T) => Promise<T>
update: (id: string, obj: Partial<T>) => Promise<T>
delete: (id: string) => Promise<T>
export interface DDBClient<T extends DDBType, TKey> {
get: (Key: TKey) => Promise<T|null>
put: (obj: T) => Promise<T>
update: (Key: TKey, obj: Partial<T>) => Promise<T>
delete: (Key: TKey) => Promise<T>
query: (options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) => AsyncGenerator<T, void, undefined>
}

export const DDB = <T extends DDBType>({
export const DDB = <T extends DDBType, TKey>({
dynamodb,
tableName,
log,
}: {
dynamodb: DynamoDB
tableName: string
log: LoggerFunction
}): DDBClient<T> => {
}): DDBClient<T, TKey> => {
const documentClient = new DynamoDB.DocumentClient({ service: dynamodb })

const get = async (id: string): Promise<null | T> => {
log('get', { tableName: tableName, id })
const { Item } = await documentClient.get({
TableName: tableName,
Key: { id },
}).promise()
return (Item as T) ?? null
const get = async (Key: TKey): Promise<null | T> => {
log('get', { tableName: tableName, Key })
try {
const { Item } = await documentClient.get({
TableName: tableName,
Key,
}).promise()
log('get:result', { Item })
return (Item as T) ?? null
} catch (e) {
log('get:error', e)
throw e
}
}

const put = async (Item: T): Promise<T> => {
log('put', { tableName: tableName, Item })
const { Attributes } = await documentClient.put({
TableName: tableName,
Item,
ReturnValues: 'ALL_OLD',
}).promise()
return Attributes as T
try {
const { Attributes } = await documentClient.put({
TableName: tableName,
Item,
ReturnValues: 'ALL_OLD',
}).promise()
return Attributes as T
} catch (e) {
log('put:error', e)
throw e
}
}

const update = async (id: string, obj: Partial<T>) => {
const AttributeUpdates = Object.entries(obj)
.map(([key, Value]) => ({ [key]: { Value, Action: 'PUT' } }))
.reduce((memo, val) => ({ ...memo, ...val }))
const update = async (Key: TKey, obj: Partial<T>) => {
log('update', { tableName: tableName, Key, obj })
try {
const AttributeUpdates = Object.entries(obj)
.map(([key, Value]) => ({ [key]: { Value, Action: 'PUT' } }))
.reduce((memo, val) => ({ ...memo, ...val }))

const { Attributes } = await documentClient.update({
TableName: tableName,
Key: { id },
AttributeUpdates,
ReturnValues: 'ALL_NEW',
}).promise()
return Attributes as T
const { Attributes } = await documentClient.update({
TableName: tableName,
Key,
AttributeUpdates,
ReturnValues: 'ALL_NEW',
}).promise()
return Attributes as T
} catch (e) {
log('update:error', e)
throw e
}
}

const deleteFunction = async (id: string): Promise<T> => {
const { Attributes } = await documentClient.delete({
TableName: tableName,
Key: { id },
ReturnValues: 'ALL_OLD',
}).promise()
return Attributes as T
const deleteFunction = async (Key: TKey): Promise<T> => {
log('delete', { tableName: tableName, Key })
try {
const { Attributes } = await documentClient.delete({
TableName: tableName,
Key,
ReturnValues: 'ALL_OLD',
}).promise()
return Attributes as T
} catch (e) {
log('delete:error', e)
throw e
}
}

const queryOnce = async (options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) => {
log('queryOnce', options)

const response = await documentClient.query({
TableName: tableName,
Select: 'ALL_ATTRIBUTES',
...options,
}).promise()
log('queryOnce', { tableName: tableName, options })
try {
const response = await documentClient.query({
TableName: tableName,
Select: 'ALL_ATTRIBUTES',
...options,
}).promise()

const { Items, LastEvaluatedKey, Count } = response
return {
items: (Items ?? []) as T[],
lastEvaluatedKey: LastEvaluatedKey,
count: Count ?? 0,
const { Items, LastEvaluatedKey, Count } = response
return {
items: (Items ?? []) as T[],
lastEvaluatedKey: LastEvaluatedKey,
count: Count ?? 0,
}
} catch (e) {
log('queryOnce:error', e)
throw e
}
}

async function* query(options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) {
log('query', options)
const results = await queryOnce(options)
yield* results.items
let lastEvaluatedKey = results.lastEvaluatedKey
while (lastEvaluatedKey) {
const results = await queryOnce({ ...options, ExclusiveStartKey: lastEvaluatedKey })
log('query', { tableName: tableName, options })
try {
const results = await queryOnce(options)
yield* results.items
lastEvaluatedKey = results.lastEvaluatedKey
let lastEvaluatedKey = results.lastEvaluatedKey
while (lastEvaluatedKey) {
const results = await queryOnce({ ...options, ExclusiveStartKey: lastEvaluatedKey })
yield* results.items
lastEvaluatedKey = results.lastEvaluatedKey
}
} catch (e) {
log('query:error', e)
throw e
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/handleStepFunctionEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const handleStepFunctionEvent = (serverPromise: Promise<ServerClosure>):
// Initial state - send ping message
if (input.state === 'PING') {
await postToConnection(server)({ ...input, message: { type: MessageType.Ping } })
await server.models.connection.update(input.connectionId, { hasPonged: false })
await server.models.connection.update({ id: input.connectionId }, { hasPonged: false })
return {
...input,
state: 'REVIEW',
Expand All @@ -21,7 +21,7 @@ export const handleStepFunctionEvent = (serverPromise: Promise<ServerClosure>):
}

// Follow up state - check if pong was returned
const conn = await server.models.connection.get(input.connectionId)
const conn = await server.models.connection.get({ id: input.connectionId })
if (conn?.hasPonged) {
return {
...input,
Expand Down
2 changes: 1 addition & 1 deletion lib/handleWebSocketEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { pong } from './messages/pong'
export const handleWebSocketEvent = (serverPromise: Promise<ServerClosure>): SubscriptionServer['webSocketHandler'] => async (event) => {
const server = await serverPromise
if (!event.requestContext) {
server.log('handleWebSocketEvent unknown')
server.log('handleWebSocketEvent unknown', { event })
return {
statusCode: 200,
body: '',
Expand Down
5 changes: 2 additions & 3 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { publish } from './pubsub/publish'
import { complete } from './pubsub/complete'
import { handleWebSocketEvent } from './handleWebSocketEvent'
import { handleStepFunctionEvent } from './handleStepFunctionEvent'
import { makeServerClosure } from './makeServerClosure'
import { buildServerClosure } from './buildServerClosure'

export const makeServer = (opts: ServerArgs): SubscriptionServer => {
const closure: Promise<ServerClosure> = makeServerClosure(opts)
const closure: Promise<ServerClosure> = buildServerClosure(opts)

return {
webSocketHandler: handleWebSocketEvent(closure),
Expand All @@ -32,7 +32,6 @@ export {
WebSocketResponse,
StateFunctionInput,
PubSubEvent,
SubscriptionDefinition,
SubscriptionFilter,
Connection,
Subscription,
Expand Down
4 changes: 2 additions & 2 deletions lib/messages/complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const complete: MessageHandler<CompleteMessage> =
async ({ server, event, message }) => {
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
try {
const subscription = await server.models.subscription.get(`${event.requestContext.connectionId}|${message.id}`)
const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` })
if (!subscription) {
return
}
Expand All @@ -37,7 +37,7 @@ export const complete: MessageHandler<CompleteMessage> =
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
await onComplete?.(root, args, context, info)

await server.models.subscription.delete(subscription.id)
await server.models.subscription.delete({ id: subscription.id })
} catch (err) {
server.log('messages:complete:onError', { err, event })
await server.onError?.(err, { event, message })
Expand Down
Loading