diff --git a/packages/db-client/src/streams/appendToStream/append.ts b/packages/db-client/src/streams/appendToStream/append.ts index d764598c..4352b713 100644 --- a/packages/db-client/src/streams/appendToStream/append.ts +++ b/packages/db-client/src/streams/appendToStream/append.ts @@ -3,11 +3,7 @@ import { Empty } from "../../../generated/shared_pb"; import { StreamsClient } from "../../../generated/streams_grpc_pb"; import type { Client } from "../../Client"; -import type { - AppendResult, - AppendExpectedRevision, - EventData, -} from "../../types"; +import type { AppendResult, AppendStreamState, EventData } from "../../types"; import { backpressuredWrite, convertToCommandError, @@ -24,7 +20,7 @@ export const append = async function ( this: Client, streamName: string, events: EventData[], - { expectedRevision, ...baseOptions }: InternalOptions + { streamState, ...baseOptions }: InternalOptions ): Promise { const header = new AppendReq(); const options = new AppendReq.Options(); @@ -32,7 +28,7 @@ export const append = async function ( options.setStreamIdentifier(identifier); - switch (expectedRevision) { + switch (streamState) { case "any": { options.setAny(new Empty()); break; @@ -46,7 +42,7 @@ export const append = async function ( break; } default: { - options.setRevision(expectedRevision.toString(10)); + options.setRevision(streamState.toString(10)); break; } } @@ -56,7 +52,7 @@ export const append = async function ( debug.command("appendToStream: %O", { streamName, events, - options: { expectedRevision, ...baseOptions }, + options: { streamState: streamState, ...baseOptions }, }); debug.command_grpc("appendToStream: %g", header); @@ -75,7 +71,7 @@ export const append = async function ( if (resp.hasWrongExpectedVersion()) { const grpcError = resp.getWrongExpectedVersion()!; - let expected: AppendExpectedRevision = "any"; + let expected: AppendStreamState = "any"; switch (true) { case grpcError.hasExpectedRevision(): { diff --git a/packages/db-client/src/streams/appendToStream/batchAppend.ts b/packages/db-client/src/streams/appendToStream/batchAppend.ts index 9b1e4060..ff359fbd 100644 --- a/packages/db-client/src/streams/appendToStream/batchAppend.ts +++ b/packages/db-client/src/streams/appendToStream/batchAppend.ts @@ -39,7 +39,7 @@ export const batchAppend = async function ( streamName: string, events: EventData[], { - expectedRevision, + streamState, batchAppendSize, ...baseOptions }: InternalOptions @@ -131,7 +131,7 @@ export const batchAppend = async function ( options.setStreamIdentifier(identifier); options.setDeadline(deadline); - switch (expectedRevision) { + switch (streamState) { case "any": { options.setAny(new Empty()); break; @@ -145,7 +145,7 @@ export const batchAppend = async function ( break; } default: { - options.setStreamPosition(expectedRevision.toString(10)); + options.setStreamPosition(streamState.toString(10)); break; } } diff --git a/packages/db-client/src/streams/appendToStream/index.ts b/packages/db-client/src/streams/appendToStream/index.ts index 3ea0e1bc..772423e3 100644 --- a/packages/db-client/src/streams/appendToStream/index.ts +++ b/packages/db-client/src/streams/appendToStream/index.ts @@ -5,7 +5,7 @@ import { ANY } from "../../constants"; import type { BaseOptions, AppendResult, - AppendExpectedRevision, + AppendStreamState, EventData, EventType, } from "../../types"; @@ -18,7 +18,7 @@ export interface AppendToStreamOptions extends BaseOptions { * Asks the server to check the stream is at specific revision before writing events. * @defaultValue ANY */ - expectedRevision?: AppendExpectedRevision; + streamState?: AppendStreamState; /** * The batch size, in bytes. * @defaultValue 3 * 1024 * 1024 @@ -49,7 +49,7 @@ Client.prototype.appendToStream = async function < streamName: string, event: EventData | EventData[], { - expectedRevision = ANY, + streamState = ANY, batchAppendSize = 3 * 1024 * 1024, ...baseOptions }: AppendToStreamOptions = {} @@ -61,14 +61,14 @@ Client.prototype.appendToStream = async function < (await this.supports(StreamsService.batchAppend)) ) { return batchAppend.call(this, streamName, events, { - expectedRevision, + streamState: streamState, batchAppendSize, ...baseOptions, }); } return append.call(this, streamName, events, { - expectedRevision, + streamState: streamState, batchAppendSize, ...baseOptions, }); diff --git a/packages/db-client/src/streams/deleteStream.ts b/packages/db-client/src/streams/deleteStream.ts index 988ead85..cdb32629 100644 --- a/packages/db-client/src/streams/deleteStream.ts +++ b/packages/db-client/src/streams/deleteStream.ts @@ -3,7 +3,7 @@ import { Empty } from "../../generated/shared_pb"; import { StreamsClient } from "../../generated/streams_grpc_pb"; import { Client } from "../Client"; -import type { BaseOptions, DeleteResult, ExpectedRevision } from "../types"; +import type { BaseOptions, DeleteResult, StreamState } from "../types"; import { debug, convertToCommandError, createStreamIdentifier } from "../utils"; import { ANY, NO_STREAM } from "../constants"; @@ -12,7 +12,7 @@ export interface DeleteStreamOptions extends BaseOptions { * Asks the server to check the stream is at specific revision before deleting. * @defaultValue ANY */ - expectedRevision?: ExpectedRevision; + expectedRevision?: StreamState; } declare module "../Client" { diff --git a/packages/db-client/src/streams/setStreamMetadata.ts b/packages/db-client/src/streams/setStreamMetadata.ts index dae04689..d2102a91 100644 --- a/packages/db-client/src/streams/setStreamMetadata.ts +++ b/packages/db-client/src/streams/setStreamMetadata.ts @@ -1,9 +1,5 @@ import { Client } from "../Client"; -import type { - BaseOptions, - AppendResult, - AppendExpectedRevision, -} from "../types"; +import type { BaseOptions, AppendResult, AppendStreamState } from "../types"; import { debug } from "../utils"; import { jsonEvent } from "../events"; @@ -15,7 +11,7 @@ export interface SetStreamMetadataOptions extends BaseOptions { * Asks the server to check the stream is at specific revision before writing events. * @defaultValue ANY */ - expectedRevision?: AppendExpectedRevision; + expectedRevision?: AppendStreamState; } declare module "../Client" { diff --git a/packages/db-client/src/streams/tombstoneStream.ts b/packages/db-client/src/streams/tombstoneStream.ts index 7a792e6c..8ff804a1 100644 --- a/packages/db-client/src/streams/tombstoneStream.ts +++ b/packages/db-client/src/streams/tombstoneStream.ts @@ -4,7 +4,7 @@ import { TombstoneReq } from "../../generated/streams_pb"; import { Client } from "../Client"; import { ANY, NO_STREAM } from "../constants"; -import type { BaseOptions, DeleteResult, ExpectedRevision } from "../types"; +import type { BaseOptions, DeleteResult, StreamState } from "../types"; import { convertToCommandError, createStreamIdentifier, debug } from "../utils"; export interface TombstoneStreamOptions extends BaseOptions { @@ -12,7 +12,7 @@ export interface TombstoneStreamOptions extends BaseOptions { * Asks the server to check the stream is at specific revision before deleting. * @defaultValue ANY */ - expectedRevision?: ExpectedRevision; + expectedRevision?: StreamState; } declare module "../Client" { diff --git a/packages/db-client/src/types/index.ts b/packages/db-client/src/types/index.ts index 4e23fdc1..c17c7de3 100644 --- a/packages/db-client/src/types/index.ts +++ b/packages/db-client/src/types/index.ts @@ -59,7 +59,7 @@ export type ReadPosition = * {@link ANY}. When using {@link ANY}., the KurrentDB server will do its best to assure * idempotency but will not guarantee idempotency. */ -export type ExpectedRevision = +export type StreamState = | typeof constants.ANY /** * The stream being written to should not yet exist. If it does exist, treats that as a concurrency problem. @@ -70,17 +70,17 @@ export type ExpectedRevision = */ | bigint; -export type AppendExpectedRevision = +export type AppendStreamState = /** * The stream should exist. If it or a metadata stream does not exist, treats that as a concurrency problem. */ | typeof constants.STREAM_EXISTS /** - * @see {@link ExpectedRevision}. + * @see {@link StreamState}. */ - | ExpectedRevision; + | StreamState; -export type CurrentRevision = +export type CurrentStreamState = /** * The stream being written to does not yet exist. */ diff --git a/packages/db-client/src/utils/CommandError.ts b/packages/db-client/src/utils/CommandError.ts index e0fbbbad..7b503947 100644 --- a/packages/db-client/src/utils/CommandError.ts +++ b/packages/db-client/src/utils/CommandError.ts @@ -6,11 +6,7 @@ import { status as StatusCode, ServiceError, Metadata } from "@grpc/grpc-js"; import { isClientCancellationError } from "."; import type { WrongExpectedVersion } from "../../generated/shared_pb"; -import type { - CurrentRevision, - EndPoint, - AppendExpectedRevision, -} from "../types"; +import type { CurrentStreamState, EndPoint, AppendStreamState } from "../types"; export enum ErrorType { TIMEOUT = "timeout", @@ -168,22 +164,22 @@ export class ScavengeNotFoundError extends CommandErrorBase { interface WrongExpectedVersionDetails { streamName: string; - expected: AppendExpectedRevision; - current: CurrentRevision; + expected: AppendStreamState; + current: CurrentStreamState; } export class WrongExpectedVersionError extends CommandErrorBase { public type: ErrorType.WRONG_EXPECTED_VERSION = ErrorType.WRONG_EXPECTED_VERSION; public streamName: string; - public expectedVersion: AppendExpectedRevision; - public actualVersion: CurrentRevision; + public expectedState: AppendStreamState; + public actualState: CurrentStreamState; static fromWrongExpectedVersion = ( details: WrongExpectedVersion, streamName: string ) => { - let expected: AppendExpectedRevision = "any"; + let expected: AppendStreamState = "any"; switch (true) { case details.hasExpectedStreamPosition(): { expected = BigInt(details.getExpectedStreamPosition()!); @@ -215,14 +211,14 @@ export class WrongExpectedVersionError extends CommandErrorBase { if (error) { const metadata = error.metadata!.getMap(); this.streamName = metadata["stream-name"].toString(); - this.expectedVersion = BigInt(metadata["expected-version"].toString()); - this.actualVersion = metadata["actual-version"] + this.expectedState = BigInt(metadata["expected-version"].toString()); + this.actualState = metadata["actual-version"] ? BigInt(metadata["actual-version"].toString()) : "no_stream"; } else { this.streamName = versions!.streamName; - this.expectedVersion = versions!.expected; - this.actualVersion = versions!.current; + this.expectedState = versions!.expected; + this.actualState = versions!.current; } } } diff --git a/packages/test/src/extra/http2-assertion-failure.test.ts b/packages/test/src/extra/http2-assertion-failure.test.ts index 63efff9f..c553c7c9 100644 --- a/packages/test/src/extra/http2-assertion-failure.test.ts +++ b/packages/test/src/extra/http2-assertion-failure.test.ts @@ -28,7 +28,7 @@ describe("http2 assertion failure", () => { const postEvents = jsonTestEvents(7); const appendRes = await client.appendToStream(stream, priorEvents, { - expectedRevision: NO_STREAM, + streamState: NO_STREAM, // we want to test classic append credentials: { username: "admin", password: "changeit" }, }); @@ -49,7 +49,7 @@ describe("http2 assertion failure", () => { }); while (received.length < 3) await delay(10); await client.appendToStream(stream, postEvents, { - expectedRevision: appendRes.nextExpectedRevision, + streamState: appendRes.nextExpectedRevision, // we want to test classic append credentials: { username: "admin", password: "changeit" }, }); diff --git a/packages/test/src/opentelemetry/instrumentation.test.ts b/packages/test/src/opentelemetry/instrumentation.test.ts index a335bdd4..83a73c24 100644 --- a/packages/test/src/opentelemetry/instrumentation.test.ts +++ b/packages/test/src/opentelemetry/instrumentation.test.ts @@ -73,7 +73,7 @@ describe("instrumentation", () => { ); const appendOptions: AppendToStreamOptions = { - expectedRevision: "any", + streamState: "any", }; if (withCredentials) { @@ -133,7 +133,7 @@ describe("instrumentation", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: "no_stream", + streamState: "no_stream", } ); diff --git a/packages/test/src/samples/appending-events.ts b/packages/test/src/samples/appending-events.ts index 49c1e1f4..1afd9b9e 100644 --- a/packages/test/src/samples/appending-events.ts +++ b/packages/test/src/samples/appending-events.ts @@ -5,7 +5,7 @@ import { FORWARDS, KurrentDBClient, JSONEventType, - AppendExpectedRevision, + AppendStreamState, WrongExpectedVersionError, } from "@kurrent/kurrentdb-client"; import { createTestNode } from "@test-utils"; @@ -44,7 +44,7 @@ describe("[sample] appending-events", () => { }); await client.appendToStream("some-stream", event, { - expectedRevision: NO_STREAM, + streamState: NO_STREAM, }); // endregion append-to-stream }); @@ -104,12 +104,12 @@ describe("[sample] appending-events", () => { }); await client.appendToStream("no-stream-stream", eventOne, { - expectedRevision: NO_STREAM, + streamState: NO_STREAM, }); // attempt to append the same event again await client.appendToStream("no-stream-stream", eventTwo, { - expectedRevision: NO_STREAM, + streamState: NO_STREAM, }); // endregion append-with-no-stream } catch (error) { @@ -144,7 +144,7 @@ describe("[sample] appending-events", () => { direction: FORWARDS, }); - let revision: AppendExpectedRevision = NO_STREAM; + let revision: AppendStreamState = NO_STREAM; for await (const { event } of events) { revision = event?.revision ?? revision; } @@ -159,7 +159,7 @@ describe("[sample] appending-events", () => { }); await client.appendToStream("concurrency-stream", clientOneEvent, { - expectedRevision: revision, + streamState: revision, }); const clientTwoEvent = jsonEvent({ @@ -172,7 +172,7 @@ describe("[sample] appending-events", () => { }); await client.appendToStream("concurrency-stream", clientTwoEvent, { - expectedRevision: revision, + streamState: revision, }); // endregion append-with-concurrency-check } catch (error) { diff --git a/packages/test/src/samples/opentelemetry.ts b/packages/test/src/samples/opentelemetry.ts index bc9c9bf1..d270fde9 100644 --- a/packages/test/src/samples/opentelemetry.ts +++ b/packages/test/src/samples/opentelemetry.ts @@ -83,7 +83,7 @@ describe("[sample] opentelemetry", () => { }, }), { - expectedRevision: "any", + streamState: "any", } ); diff --git a/packages/test/src/streams/appendToStream-errors.test.ts b/packages/test/src/streams/appendToStream-errors.test.ts index bdbfb031..55b726cd 100644 --- a/packages/test/src/streams/appendToStream-errors.test.ts +++ b/packages/test/src/streams/appendToStream-errors.test.ts @@ -44,7 +44,7 @@ describe("appendToStream - errors", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: "no_stream", + streamState: "no_stream", } ); @@ -54,8 +54,8 @@ describe("appendToStream - errors", () => { if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM_NAME); - expect(error.expectedVersion).toBe(NO_STREAM); - expect(error.actualVersion).toBeGreaterThanOrEqual(1); + expect(error.expectedState).toBe(NO_STREAM); + expect(error.actualState).toBeGreaterThanOrEqual(1); } } }); diff --git a/packages/test/src/streams/appendToStream.test.ts b/packages/test/src/streams/appendToStream.test.ts index b9636f7f..d12636d0 100644 --- a/packages/test/src/streams/appendToStream.test.ts +++ b/packages/test/src/streams/appendToStream.test.ts @@ -375,7 +375,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: ANY, + streamState: ANY, } ); @@ -392,7 +392,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: NO_STREAM, + streamState: NO_STREAM, } ); @@ -410,7 +410,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: "no_stream", + streamState: "no_stream", } ); @@ -420,8 +420,8 @@ describe("appendToStream", () => { if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM_NAME); - expect(error.expectedVersion).toBe(NO_STREAM); - expect(error.actualVersion).toBeGreaterThanOrEqual(1); + expect(error.expectedState).toBe(NO_STREAM); + expect(error.actualState).toBeGreaterThanOrEqual(1); } } }); @@ -437,7 +437,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: STREAM_EXISTS, + streamState: STREAM_EXISTS, } ); expect(result).toBeDefined(); @@ -452,7 +452,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: STREAM_EXISTS, + streamState: STREAM_EXISTS, } ); @@ -462,8 +462,8 @@ describe("appendToStream", () => { if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM_NAME); - expect(error.expectedVersion).toBe(STREAM_EXISTS); - expect(error.actualVersion).toBe(NO_STREAM); + expect(error.expectedState).toBe(STREAM_EXISTS); + expect(error.actualState).toBe(NO_STREAM); } } }); @@ -482,7 +482,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: nextExpectedRevision, + streamState: nextExpectedRevision, } ); @@ -498,7 +498,7 @@ describe("appendToStream", () => { const result = await client.appendToStream( STREAM_NAME, jsonTestEvents(), - { expectedRevision: BigInt(1) } + { streamState: BigInt(1) } ); expect(result).toBe("unreachable"); @@ -507,8 +507,8 @@ describe("appendToStream", () => { if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM_NAME); - expect(error.expectedVersion).toBe(BigInt(1)); - expect(error.actualVersion).toBe(NO_STREAM); + expect(error.expectedState).toBe(BigInt(1)); + expect(error.actualState).toBe(NO_STREAM); } } }); @@ -526,7 +526,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: nextExpectedRevision + BigInt(1), + streamState: nextExpectedRevision + BigInt(1), } ); @@ -536,10 +536,10 @@ describe("appendToStream", () => { if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM_NAME); - expect(error.expectedVersion).toBe( + expect(error.expectedState).toBe( nextExpectedRevision + BigInt(1) ); - expect(error.actualVersion).toBe(nextExpectedRevision); + expect(error.actualState).toBe(nextExpectedRevision); } } }); @@ -565,7 +565,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: "no_stream", + streamState: "no_stream", } ); @@ -575,8 +575,8 @@ describe("appendToStream", () => { if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM_NAME); - expect(error.expectedVersion).toBe(NO_STREAM); - expect(error.actualVersion).toBeGreaterThanOrEqual(1); + expect(error.expectedState).toBe(NO_STREAM); + expect(error.actualState).toBeGreaterThanOrEqual(1); } } }); @@ -596,7 +596,7 @@ describe("appendToStream", () => { STREAM_NAME, jsonTestEvents(), { - expectedRevision: "no_stream", + streamState: "no_stream", } ); diff --git a/packages/test/src/streams/deleteStream.test.ts b/packages/test/src/streams/deleteStream.test.ts index c79888d4..d2718457 100644 --- a/packages/test/src/streams/deleteStream.test.ts +++ b/packages/test/src/streams/deleteStream.test.ts @@ -57,7 +57,7 @@ describe("deleteStream", () => { expect(error).toBeInstanceOf(WrongExpectedVersionError); if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM); - expect(error.expectedVersion).toBe(BigInt(2)); + expect(error.expectedState).toBe(BigInt(2)); } } }); diff --git a/packages/test/src/streams/tombstoneStream.test.ts b/packages/test/src/streams/tombstoneStream.test.ts index 18210ba1..5e1efd25 100644 --- a/packages/test/src/streams/tombstoneStream.test.ts +++ b/packages/test/src/streams/tombstoneStream.test.ts @@ -69,7 +69,7 @@ describe("tombstoneStream", () => { expect(error).toBeInstanceOf(WrongExpectedVersionError); if (error instanceof WrongExpectedVersionError) { expect(error.streamName).toBe(STREAM); - expect(error.expectedVersion).toBe(BigInt(2)); + expect(error.expectedState).toBe(BigInt(2)); } } });