Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions packages/db-client/src/streams/appendToStream/append.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ import { Empty } from "../../../generated/shared_pb";
import { StreamsClient } from "../../../generated/streams_grpc_pb";

import type { Client } from "../../Client";
import type {
AppendResult,
AppendExpectedRevision,
EventData,
} from "../../types";
import type { AppendResult, AppendStreamState, EventData } from "../../types";
import {
backpressuredWrite,
convertToCommandError,
Expand All @@ -24,15 +20,15 @@ export const append = async function (
this: Client,
streamName: string,
events: EventData[],
{ expectedRevision, ...baseOptions }: InternalOptions<AppendToStreamOptions>
{ streamState, ...baseOptions }: InternalOptions<AppendToStreamOptions>
): Promise<AppendResult> {
const header = new AppendReq();
const options = new AppendReq.Options();
const identifier = createStreamIdentifier(streamName);

options.setStreamIdentifier(identifier);

switch (expectedRevision) {
switch (streamState) {
case "any": {
options.setAny(new Empty());
break;
Expand All @@ -46,7 +42,7 @@ export const append = async function (
break;
}
default: {
options.setRevision(expectedRevision.toString(10));
options.setRevision(streamState.toString(10));
break;
}
}
Expand All @@ -56,7 +52,7 @@ export const append = async function (
debug.command("appendToStream: %O", {
streamName,
events,
options: { expectedRevision, ...baseOptions },
options: { streamState: streamState, ...baseOptions },
});
debug.command_grpc("appendToStream: %g", header);

Expand All @@ -75,7 +71,7 @@ export const append = async function (
if (resp.hasWrongExpectedVersion()) {
const grpcError = resp.getWrongExpectedVersion()!;

let expected: AppendExpectedRevision = "any";
let expected: AppendStreamState = "any";

switch (true) {
case grpcError.hasExpectedRevision(): {
Expand Down
6 changes: 3 additions & 3 deletions packages/db-client/src/streams/appendToStream/batchAppend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const batchAppend = async function (
streamName: string,
events: EventData[],
{
expectedRevision,
streamState,
batchAppendSize,
...baseOptions
}: InternalOptions<AppendToStreamOptions>
Expand Down Expand Up @@ -131,7 +131,7 @@ export const batchAppend = async function (
options.setStreamIdentifier(identifier);
options.setDeadline(deadline);

switch (expectedRevision) {
switch (streamState) {
case "any": {
options.setAny(new Empty());
break;
Expand All @@ -145,7 +145,7 @@ export const batchAppend = async function (
break;
}
default: {
options.setStreamPosition(expectedRevision.toString(10));
options.setStreamPosition(streamState.toString(10));
break;
}
}
Expand Down
10 changes: 5 additions & 5 deletions packages/db-client/src/streams/appendToStream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { ANY } from "../../constants";
import type {
BaseOptions,
AppendResult,
AppendExpectedRevision,
AppendStreamState,
EventData,
EventType,
} from "../../types";
Expand All @@ -18,7 +18,7 @@ export interface AppendToStreamOptions extends BaseOptions {
* Asks the server to check the stream is at specific revision before writing events.
* @defaultValue ANY
*/
expectedRevision?: AppendExpectedRevision;
streamState?: AppendStreamState;
/**
* The batch size, in bytes.
* @defaultValue 3 * 1024 * 1024
Expand Down Expand Up @@ -49,7 +49,7 @@ Client.prototype.appendToStream = async function <
streamName: string,
event: EventData<KnownEventType> | EventData<KnownEventType>[],
{
expectedRevision = ANY,
streamState = ANY,
batchAppendSize = 3 * 1024 * 1024,
...baseOptions
}: AppendToStreamOptions = {}
Expand All @@ -61,14 +61,14 @@ Client.prototype.appendToStream = async function <
(await this.supports(StreamsService.batchAppend))
) {
return batchAppend.call(this, streamName, events, {
expectedRevision,
streamState: streamState,
batchAppendSize,
...baseOptions,
});
}

return append.call(this, streamName, events, {
expectedRevision,
streamState: streamState,
batchAppendSize,
...baseOptions,
});
Expand Down
4 changes: 2 additions & 2 deletions packages/db-client/src/streams/deleteStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Empty } from "../../generated/shared_pb";
import { StreamsClient } from "../../generated/streams_grpc_pb";

import { Client } from "../Client";
import type { BaseOptions, DeleteResult, ExpectedRevision } from "../types";
import type { BaseOptions, DeleteResult, StreamState } from "../types";
import { debug, convertToCommandError, createStreamIdentifier } from "../utils";
import { ANY, NO_STREAM } from "../constants";

Expand All @@ -12,7 +12,7 @@ export interface DeleteStreamOptions extends BaseOptions {
* Asks the server to check the stream is at specific revision before deleting.
* @defaultValue ANY
*/
expectedRevision?: ExpectedRevision;
expectedRevision?: StreamState;
}

declare module "../Client" {
Expand Down
8 changes: 2 additions & 6 deletions packages/db-client/src/streams/setStreamMetadata.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import { Client } from "../Client";
import type {
BaseOptions,
AppendResult,
AppendExpectedRevision,
} from "../types";
import type { BaseOptions, AppendResult, AppendStreamState } from "../types";
import { debug } from "../utils";
import { jsonEvent } from "../events";

Expand All @@ -15,7 +11,7 @@ export interface SetStreamMetadataOptions extends BaseOptions {
* Asks the server to check the stream is at specific revision before writing events.
* @defaultValue ANY
*/
expectedRevision?: AppendExpectedRevision;
expectedRevision?: AppendStreamState;
}

declare module "../Client" {
Expand Down
4 changes: 2 additions & 2 deletions packages/db-client/src/streams/tombstoneStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import { TombstoneReq } from "../../generated/streams_pb";

import { Client } from "../Client";
import { ANY, NO_STREAM } from "../constants";
import type { BaseOptions, DeleteResult, ExpectedRevision } from "../types";
import type { BaseOptions, DeleteResult, StreamState } from "../types";
import { convertToCommandError, createStreamIdentifier, debug } from "../utils";

export interface TombstoneStreamOptions extends BaseOptions {
/**
* Asks the server to check the stream is at specific revision before deleting.
* @defaultValue ANY
*/
expectedRevision?: ExpectedRevision;
expectedRevision?: StreamState;
}

declare module "../Client" {
Expand Down
10 changes: 5 additions & 5 deletions packages/db-client/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export type ReadPosition =
* {@link ANY}. When using {@link ANY}., the KurrentDB server will do its best to assure
* idempotency but will not guarantee idempotency.
*/
export type ExpectedRevision =
export type StreamState =
| typeof constants.ANY
/**
* The stream being written to should not yet exist. If it does exist, treats that as a concurrency problem.
Expand All @@ -70,17 +70,17 @@ export type ExpectedRevision =
*/
| bigint;

export type AppendExpectedRevision =
export type AppendStreamState =
/**
* The stream should exist. If it or a metadata stream does not exist, treats that as a concurrency problem.
*/
| typeof constants.STREAM_EXISTS
/**
* @see {@link ExpectedRevision}.
* @see {@link StreamState}.
*/
| ExpectedRevision;
| StreamState;

export type CurrentRevision =
export type CurrentStreamState =
/**
* The stream being written to does not yet exist.
*/
Expand Down
24 changes: 10 additions & 14 deletions packages/db-client/src/utils/CommandError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ import { status as StatusCode, ServiceError, Metadata } from "@grpc/grpc-js";
import { isClientCancellationError } from ".";

import type { WrongExpectedVersion } from "../../generated/shared_pb";
import type {
CurrentRevision,
EndPoint,
AppendExpectedRevision,
} from "../types";
import type { CurrentStreamState, EndPoint, AppendStreamState } from "../types";

export enum ErrorType {
TIMEOUT = "timeout",
Expand Down Expand Up @@ -168,22 +164,22 @@ export class ScavengeNotFoundError extends CommandErrorBase {

interface WrongExpectedVersionDetails {
streamName: string;
expected: AppendExpectedRevision;
current: CurrentRevision;
expected: AppendStreamState;
current: CurrentStreamState;
}

export class WrongExpectedVersionError extends CommandErrorBase {
public type: ErrorType.WRONG_EXPECTED_VERSION =
ErrorType.WRONG_EXPECTED_VERSION;
public streamName: string;
public expectedVersion: AppendExpectedRevision;
public actualVersion: CurrentRevision;
public expectedState: AppendStreamState;
public actualState: CurrentStreamState;

static fromWrongExpectedVersion = (
details: WrongExpectedVersion,
streamName: string
) => {
let expected: AppendExpectedRevision = "any";
let expected: AppendStreamState = "any";
switch (true) {
case details.hasExpectedStreamPosition(): {
expected = BigInt(details.getExpectedStreamPosition()!);
Expand Down Expand Up @@ -215,14 +211,14 @@ export class WrongExpectedVersionError extends CommandErrorBase {
if (error) {
const metadata = error.metadata!.getMap();
this.streamName = metadata["stream-name"].toString();
this.expectedVersion = BigInt(metadata["expected-version"].toString());
this.actualVersion = metadata["actual-version"]
this.expectedState = BigInt(metadata["expected-version"].toString());
this.actualState = metadata["actual-version"]
? BigInt(metadata["actual-version"].toString())
: "no_stream";
} else {
this.streamName = versions!.streamName;
this.expectedVersion = versions!.expected;
this.actualVersion = versions!.current;
this.expectedState = versions!.expected;
this.actualState = versions!.current;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/test/src/extra/http2-assertion-failure.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe("http2 assertion failure", () => {
const postEvents = jsonTestEvents(7);

const appendRes = await client.appendToStream(stream, priorEvents, {
expectedRevision: NO_STREAM,
streamState: NO_STREAM,
// we want to test classic append
credentials: { username: "admin", password: "changeit" },
});
Expand All @@ -49,7 +49,7 @@ describe("http2 assertion failure", () => {
});
while (received.length < 3) await delay(10);
await client.appendToStream(stream, postEvents, {
expectedRevision: appendRes.nextExpectedRevision,
streamState: appendRes.nextExpectedRevision,
// we want to test classic append
credentials: { username: "admin", password: "changeit" },
});
Expand Down
4 changes: 2 additions & 2 deletions packages/test/src/opentelemetry/instrumentation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe("instrumentation", () => {
);

const appendOptions: AppendToStreamOptions = {
expectedRevision: "any",
streamState: "any",
};

if (withCredentials) {
Expand Down Expand Up @@ -133,7 +133,7 @@ describe("instrumentation", () => {
STREAM_NAME,
jsonTestEvents(),
{
expectedRevision: "no_stream",
streamState: "no_stream",
}
);

Expand Down
14 changes: 7 additions & 7 deletions packages/test/src/samples/appending-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
FORWARDS,
KurrentDBClient,
JSONEventType,
AppendExpectedRevision,
AppendStreamState,
WrongExpectedVersionError,
} from "@kurrent/kurrentdb-client";
import { createTestNode } from "@test-utils";
Expand Down Expand Up @@ -44,7 +44,7 @@ describe("[sample] appending-events", () => {
});

await client.appendToStream("some-stream", event, {
expectedRevision: NO_STREAM,
streamState: NO_STREAM,
});
// endregion append-to-stream
});
Expand Down Expand Up @@ -104,12 +104,12 @@ describe("[sample] appending-events", () => {
});

await client.appendToStream("no-stream-stream", eventOne, {
expectedRevision: NO_STREAM,
streamState: NO_STREAM,
});

// attempt to append the same event again
await client.appendToStream("no-stream-stream", eventTwo, {
expectedRevision: NO_STREAM,
streamState: NO_STREAM,
});
// endregion append-with-no-stream
} catch (error) {
Expand Down Expand Up @@ -144,7 +144,7 @@ describe("[sample] appending-events", () => {
direction: FORWARDS,
});

let revision: AppendExpectedRevision = NO_STREAM;
let revision: AppendStreamState = NO_STREAM;
for await (const { event } of events) {
revision = event?.revision ?? revision;
}
Expand All @@ -159,7 +159,7 @@ describe("[sample] appending-events", () => {
});

await client.appendToStream("concurrency-stream", clientOneEvent, {
expectedRevision: revision,
streamState: revision,
});

const clientTwoEvent = jsonEvent<SomeEvent>({
Expand All @@ -172,7 +172,7 @@ describe("[sample] appending-events", () => {
});

await client.appendToStream("concurrency-stream", clientTwoEvent, {
expectedRevision: revision,
streamState: revision,
});
// endregion append-with-concurrency-check
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion packages/test/src/samples/opentelemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe("[sample] opentelemetry", () => {
},
}),
{
expectedRevision: "any",
streamState: "any",
}
);

Expand Down
Loading