Skip to content

Some minor drive-by fixes and cleanups for the electric collection. #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 31 additions & 53 deletions packages/db-collections/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ function isUpToDateMessage<T extends Row<unknown>>(
// Check if a message contains txids in its headers
function hasTxids<T extends Row<unknown>>(
message: Message<T>
): message is Message<T> & { headers: { txids?: Array<number> } } {
return (
`headers` in message &&
`txids` in message.headers &&
Array.isArray(message.headers.txids)
)
): message is Message<T> & { headers: { txids?: Array<string> } } {
return `txids` in message.headers && Array.isArray(message.headers.txids)
}

/**
Expand Down Expand Up @@ -127,7 +123,7 @@ export function electricCollectionOptions<
TSchema extends StandardSchemaV1 = never,
TFallback extends Row<unknown> = Row<unknown>,
>(config: ElectricCollectionConfig<TExplicit, TSchema, TFallback>) {
const seenTxids = new Store<Set<string>>(new Set([`${Math.random()}`]))
const seenTxids = new Store<Set<string>>(new Set([]))
const sync = createElectricSync<ResolveType<TExplicit, TSchema, TFallback>>(
config.shapeOptions,
{
Expand All @@ -143,7 +139,7 @@ export function electricCollectionOptions<
*/
const awaitTxId: AwaitTxIdFn = async (
txId: string,
timeout = 30000
timeout: number = 30000
): Promise<boolean> => {
if (typeof txId !== `string`) {
throw new TypeError(
Expand Down Expand Up @@ -180,18 +176,14 @@ export function electricCollectionOptions<
ResolveType<TExplicit, TSchema, TFallback>
>
) => {
// Runtime check (that doesn't follow type)
// eslint-disable-next-line
const handlerResult = (await config.onInsert!(params)) ?? {}
const txid = (handlerResult as { txid?: string }).txid

if (!txid) {
const handlerResult = await config.onInsert!(params)
if (!handlerResult.txid) {
throw new Error(
`Electric collection onInsert handler must return a txid`
)
}

await awaitTxId(txid)
await awaitTxId(handlerResult.txid)
return handlerResult
}
: undefined
Expand All @@ -202,18 +194,14 @@ export function electricCollectionOptions<
ResolveType<TExplicit, TSchema, TFallback>
>
) => {
// Runtime check (that doesn't follow type)
// eslint-disable-next-line
const handlerResult = (await config.onUpdate!(params)) ?? {}
const txid = (handlerResult as { txid?: string }).txid

if (!txid) {
const handlerResult = await config.onUpdate!(params)
if (!handlerResult.txid) {
throw new Error(
`Electric collection onUpdate handler must return a txid`
)
}

await awaitTxId(txid)
await awaitTxId(handlerResult.txid)
return handlerResult
}
: undefined
Expand All @@ -224,18 +212,14 @@ export function electricCollectionOptions<
ResolveType<TExplicit, TSchema, TFallback>
>
) => {
// Runtime check (that doesn't follow type)
// eslint-disable-next-line
const handlerResult = (await config.onDelete!(params)) ?? {}
const txid = (handlerResult as { txid?: string }).txid

if (!txid) {
const handlerResult = await config.onDelete!(params)
if (!handlerResult.txid) {
throw new Error(
`Electric collection onDelete handler must return a txid`
)
}

await awaitTxId(txid)
await awaitTxId(handlerResult.txid)
return handlerResult
}
: undefined
Expand Down Expand Up @@ -295,59 +279,53 @@ function createElectricSync<T extends object>(
const { begin, write, commit } = params
const stream = new ShapeStream(shapeOptions)
let transactionStarted = false
let newTxids = new Set<string>()
const newTxids = new Set<string>()

stream.subscribe((messages: Array<Message<Row>>) => {
let hasUpToDate = false

for (const message of messages) {
// Check for txids in the message and add them to our store
if (hasTxids(message) && message.headers.txids) {
message.headers.txids.forEach((txid) => newTxids.add(String(txid)))
if (hasTxids(message)) {
message.headers.txids?.forEach((txid) => newTxids.add(txid))
}

// Check if the message contains schema information
if (isChangeMessage(message) && message.headers.schema) {
// Store the schema for future use if it's a valid string
if (typeof message.headers.schema === `string`) {
const schema: string = message.headers.schema
if (isChangeMessage(message)) {
// Check if the message contains schema information
const schema = message.headers.schema
if (schema && typeof schema === `string`) {
// Store the schema for future use if it's a valid string
relationSchema.setState(() => schema)
}
}

if (isChangeMessage(message)) {
if (!transactionStarted) {
begin()
transactionStarted = true
}

const value = message.value as unknown as T

// Include the primary key and relation info in the metadata
const enhancedMetadata = {
...message.headers,
}

write({
type: message.headers.operation,
value,
metadata: enhancedMetadata,
value: message.value as T,
// Include the primary key and relation info in the metadata
metadata: {
...message.headers,
},
})
} else if (isUpToDateMessage(message)) {
hasUpToDate = true
}
}

if (hasUpToDate && transactionStarted) {
transactionStarted = false

commit()
seenTxids.setState((currentTxids) => {
const clonedSeen = new Set(currentTxids)
newTxids.forEach((txid) => clonedSeen.add(String(txid)))

newTxids = new Set()
const clonedSeen = new Set<string>(currentTxids)
newTxids.forEach((txid) => clonedSeen.add(txid))
newTxids.clear()
return clonedSeen
})
transactionStarted = false
}
})
},
Expand Down
2 changes: 1 addition & 1 deletion packages/db-collections/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ describe(`Electric Integration`, () => {
value: value.value as Row,
headers: {
operation: `insert`,
txids: [Number(txid)], // Convert to number as the API expects numbers but our code converts to strings
txids: [txid],
},
})
}
Expand Down