Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/3 catch up subscriptions #10

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ Subscribes to a stream to receive notifications as soon as an event is written t

Returns a Buffer containing a GUID that identifies the subscription, for use with unsubscribeStream().

### Connection.subscribeToStreamFrom()
Executes a catch-up subscription on the given stream, reading events from a given event number, and continuing with a live subscription when all historical events have been read.

* streamId - The name of the stream in the Event Store (string)
* fromEventNumber - Which event number to start after (if null, then from the beginning of the stream.)
* credentials - The user name and password needed for permission to subscribe to the stream.
* onEventAppeared - Callback for each event received (historical or live)
* onLiveProcessingStarted - Callback when historical events have been read and live events are about to be read.
* onDropped - Callback when subscription drops or is dropped.
* settings - Settings for the catch-up subscription.

Returns an instance representing the catch-up subscription (EventStoreStreamCatchUpSubscription).

### Connection.readAllEventsBackward() / Connection.readAllEventsForward()
Reads events from across all streams, in order (backward = newest first, forward = oldest first).

Expand Down Expand Up @@ -129,4 +142,24 @@ Passed to the onConfirmed callback used by subscribeToStream() when a subscripti
## ISubscriptionDropped interface
Passed to the onDropped callback used by subscribeToStream() when a subscription terminates, or cannot be established.

* reason - The reason why the subscription was dropped (enumeration, 0 = Unsubscribed, 1 = Access Denied)
* reason - The reason why the subscription was dropped (enumeration, 0 = Unsubscribed, 1 = Access Denied)

## CatchUpSubscriptionSettings class
A property bag of settings passed when creating a new catch-up subscription.

* maxLiveQueueSize - The maximum amount to buffer when processing from the live subscription.
* readBatchSize - The number of events to read per batch when reading historical events.
* debug - True if in debug mode.
* resolveLinkTos - Whether or not to resolve link events.

## EventStoreStreamCatchUpSubscription class
Represents a catch-up subscription to a single stream.

### EventStoreStreamCatchUpSubscription.start()
Initiate start of the catch-up subscription.

### EventStoreStreamCatchUpSubscription.stop()
Request to stop the catch-up subscription.

### EventStoreStreamCatchUpSubscription.getCorrelationId()
Get the subscription ID of the underlying Event Store subscription, in order to pass it back to the Connection object, for example.
104 changes: 103 additions & 1 deletion event-store-client.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ declare module "event-store-client" {
*/
subscribeToStream(streamId: string, resolveLinkTos: boolean, onEventAppeared: (event: StoredEvent) => void, onConfirmed: (confirmation: ISubscriptionConfirmation) => void, onDropped: (dropped: ISubscriptionDropped) => void, credentials: ICredentials): Buffer;

/***
* Initiate catch-up subscription for one stream.
*
* @param streamId The stream ID (only if subscribing to a single stream)
* @param fromEventNumber Which event number to start after (if null, then from the beginning of the stream.)
* @param credentials User credentials for the operations.
* @param onEventAppeared Callback for each event received
* @param onLiveProcessingStarted Callback when read history phase finishes.
* @param onDropped Callback when subscription drops or is dropped.
* @param settings Settings for this subscription.
* @return The catch-up subscription instance.
*/
subscribeToStreamFrom(streamId: string, fromEventNumber: number, credentials: ICredentials, onEventAppeared: (event: StoredEvent) => void, onLiveProcessingStarted: () => void, onDropped: (EventStoreCatchUpSubscription, string, Error) => void, settings: CatchUpSubscriptionSettings): EventStoreStreamCatchUpSubscription;

/***
* Reads events from across all streams, in order from newest to oldest
* @param commitPosition The commit position to start from
Expand Down Expand Up @@ -234,5 +248,93 @@ declare module "event-store-client" {
* @param callback Invoked once the operation has been completed. Check the result to confirm it was successful.
*/
writeEvents(streamId: string, expectedVersion: number, requireMaster: boolean, events: Event[], credentials: ICredentials, callback: (completed: IWriteEventsCompleted) => void): void;
}
}

/***
* Configuration settings to pass when instantiating a catch-up subscription.
*/
export class CatchUpSubscriptionSettings {

/***
* Creates a new settings instance.
* @param maxLiveQueueSize The max amount to buffer when processing from live subscription.
* @param readBatchSize The number of events to read per batch when reading history
* @param debug True iff in debug mode
* @param resolveLinkTos Whether or not to resolve link events
*/
constructor(maxLiveQueueSize: number, readBatchSize: number, debug: boolean, resolveLinkTos: boolean);

/***
* The max amount to buffer when processing from live subscription.
*/
maxLiveQueueSize: number;

/***
* The number of events to read per batch when reading history
*/
readBatchSize: number;

/***
* True iff in debug mode
*/
debug: boolean;

/***
* Whether or not to resolve link events
*/
resolveLinkTos: boolean;
}

/**
* Abstract base class representing catch-up subscriptions.
*/
export class EventStoreCatchUpSubscription {

/***
* Creates a new EventStoreCatchUpSubscription instance.
* @param connection The connection to Event Store
* @param streamId The stream name (only if subscribing to a single stream)
* @param userCredentials User credentials for the operations.
* @param eventAppeared Callback for each event received
* @param liveProcessingStarted Callback when read history phase finishes.
* @param subscriptionDropped Callback when subscription drops or is dropped.
* @param settings Settings for this subscription.
*/
constructor(connection: Connection, streamId: string, userCredentials: ICredentials, eventAppeared: (event: StoredEvent) => void, liveProcessingStarted: () => void, subscriptionDropped: (EventStoreCatchUpSubscription, string, Error) => void, settings: CatchUpSubscriptionSettings);

/***
* Provides the correlation ID of the Event Store subscription underlying the catch-up subscription.
* @returns Correlation ID of the Event Store subscription
*/
getCorrelationId(): string;

/***
* Attempts to start the subscription.
*/
start(): void;

/***
* Attempts to stop the subscription.
*/
stop(): void;
}

/**
* Catch-up subscription for one stream.
*/
export class EventStoreStreamCatchUpSubscription extends EventStoreCatchUpSubscription {

/***
* Creates a new EventStoreStreamCatchUpSubscription instance.
* @param connection The connection to Event Store
* @param streamId The stream name (only if subscribing to a single stream)
* @param fromEventNumberExclusive Which event number to start after (if null, then from the beginning of the stream.)
* @param userCredentials User credentials for the operations.
* @param eventAppeared Callback for each event received
* @param liveProcessingStarted Callback when read history phase finishes.
* @param subscriptionDropped Callback when subscription drops or is dropped.
* @param settings Settings for this subscription.
*/
constructor(connection: Connection, streamId: string, fromEventNumberExclusive: number, userCredentials: ICredentials, eventAppeared: (event: StoredEvent) => void, liveProcessingStarted: () => void, subscriptionDropped: (EventStoreCatchUpSubscription, string, Error) => void, settings: CatchUpSubscriptionSettings);
}
}
4 changes: 3 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var Connection = require("./lib/connection"),
var CatchUpSubscription = require("./lib/CatchUpSubscription"),
Connection = require("./lib/connection"),
Commands = require("./lib/commands"),
ExpectedVersion = require("./lib/expectedVersion"),
Messages = require("./lib/messages"),
Expand All @@ -7,6 +8,7 @@ var Connection = require("./lib/connection"),
ReadStreamResult = require("./lib/readStreamResult"),
EventStoreClient = {};

EventStoreClient.CatchUpSubscription = CatchUpSubscription;
EventStoreClient.Connection = Connection;
EventStoreClient.Commands = Commands;
EventStoreClient.ExpectedVersion = ExpectedVersion;
Expand Down
Loading