Skip to content

wuyazi/event-sourcing

 
 

Repository files navigation

Ocoda Logo

 

  This library is still under construction and thus subject to breaking changes. It's not recommended to use it in production. 🚧  

This library was created to help people get started with event-sourcing in NestJS. Event-sourcing is the practice of capturing state transitions in your domain models instead of only capturing the current state. It contains the building blocks to implement Command query responsibility segregation, store events and snapshots, react to events and much more.

 

Table of Contents
  1. Getting started
  2. Aggregates & value objects
  3. Commands & command handlers
  4. Events
  5. Snapshots
  6. Aggregate repositories
  7. Queries
  8. Misc
  9. Contact
  10. Acknowledgments
 
 

Getting started

To get started with this library, you need to install it first.

npm install @ocoda/event-sourcing

This library currently provides wrappers for storing events and snapshots for MongoDB and DynamoDB. To make use of database wrappers, you will need to install their respective libraries:

npm install mongodb # For using MongoDB
npm install @aws-sdk/client-dynamodb @aws-sdk/util-dynamodb # For using DynamoDB

For testing purposes no database wrapper is required, this library ships with a fully functional in-memory store.

Once you have installed all required packages we can import the EventSourcingModule into the root AppModule of your application. The configuration itself depends on the type of database you want to use and if you want to make use of snapshots.

import { EventSourcingModule } from '@ocoda/event-sourcing';
import { Events } from './app.providers.ts';

@Module({
  imports: [
    EventSourcingModule.forRoot({
      eventStore: {
        client: 'mongodb',
        options: { 
          url: 'mongodb://127.0.0.1:27017' 
        },
      },
      snapshotStore: {
        client: 'dynamodb',
        options: {
          region: 'us-east-1',
          credentials: { accessKeyId: 'foo', secretAccessKey: 'bar' },
        }
      },
      events: [...Events],
    }),
})
export class AppModule {}

 

Aggregates & value objects

An aggregate models an individual concept that has a unique identity in your application, e.g. an account.

To create an aggregate using this library you will need to:

  • inherit the AggregateRoot class, which is responsible for handling events and keeping track of the version of the aggregate
  • apply the @Aggregate() decorator
import { Aggregate, AggregateRoot } from '@ocoda/event-sourcing';

@Aggregate('account')
class Account extends AggregateRoot {
  ...
}

The @Aggregate() decorator marks the class as an aggregate and optionally specifies how the streamId of events and snapshots should be named, e.g. @Aggregate({ streamName: 'account' }) will create the following streamId: account-d46fb0f9-02dc-4d11-a282-ab00f7fffeff. If the stream name isn't provided in the decorator, the name of the class will automatically be converted to lowercase and used.

A Value Object is an immutable model has no conceptual identity, it describes charasteristics and optionally requires some validation, e.g. the name of an account. To create a value object, we can simply extend the ValueObject class.

import { ValueObject } from '@ocoda/event-sourcing';

export class AccountName extends ValueObject {
  public static fromString(name: string) {
    if(name.length < 3) {
      throw new Error('Account name should contain at least 3 characters');
    }
    return new Accountname({ value: name });
  }

  get value(): string {
    return this.props.value;
  }
}

 

Commands & command handlers

A Command is an object that is sent to your domain application that describes the intent of the user and is handled by a CommandHandler. Ideally the name of a command implies the Aggregate it operates on and its intent imperatively, e.g. OpenAccountCommand.

import { ICommand } from '@ocoda/event-sourcing';

class OpenAccountCommand implements ICommand {
  constructor(public readonly accountOwner: string) {}
}

You can then define a CommandHandler that will be responsible for handling every execution of this Command.

import { CommandHandler, ICommandHandler } from '@ocoda/event-sourcing';

@CommandHandler(OpenAccountCommand)
export class OpenAccountCommandHandler implements ICommandHandler {

  constructor(private readonly accountRepository: AccountRepository) {}

  async execute(command: OpenAccountCommand): Promise<string> {
    const accountId = AccountId.generate();
    const account = Account.open(accountId, command.accountOwnerIds?.map(AccountOwnerId.from));

    await this.accountRepository.save(account);

    return accountId.value;
  }
}

Don't forget to register your CommandHandlers as providers in your application.  

Events

Events are classes that describe a fact that took place. They can be created by using the @Event() decorator and must be registered in the EventSourcingModule. If no name is explicitly provided, the name of the class itself is used, otherwise the provided name gets added as metadata to your class. The name of your event is used internally to create a map of the events within your application and optionally link that event to a custom event serializer.

@Event('account-opened')
export class AccountOpenedEvent implements IEvent {
  constructor(
    public readonly accountId: string,
    public readonly openedOn: string,
    public readonly accountOwnerIds?: string[]
  ) {}
}

Preferrably events contain only primitive values, otherwise this can cause issues when storing and reading them from a database. To mediate this however, whenever an event needs to be stored or retrieved from the database it gets (de)serialized using the class-transformer library, you can however write your own serializer logic for an event. If you decide to, don't forget to register your event serializers as providers in your application.

@Event('account-opened')
export class AccountOpenedEvent implements IEvent {
  constructor(
    public readonly accountId: AccountId,
    public readonly openedOn: Date,
    public readonly accountOwnerIds?: AccountOwnerId[]
  ) {}
}

@EventSerializer(AccountOpenedEvent)
export class AccountOpenedEventSerializer implements IEventSerializer {
  serialize({ accountId, openedOn, accountOwnerIds }: AccountOpenedEvent): IEventPayload<AccountOpenedEvent> {
    return {
      accountId: accountId.value,
      openedOn: openedOn.toISOString(),
      accountOwnerIds: accountOwnerIds?.map((id) => id.value)
    };
  }

  deserialize({ id, openedOn, accountOwnerIds }: IEventPayload<AccountOpenedEvent>): AccountOpenedEvent {
    const accountId = AccountId.from(id);
    const openedOnDate = openedOn && new Date(openedOn);
    const ownerIds = accountOwnerIds?.map((id) => AccountOwnerId.from(id));

    return new AccountOpenedEvent(accountId, openedOnDate, ownerIds);
  }
}

 

Event streams

The EventStream class creates a representation of a stream of events for a specific aggregate.

const accountId = Id.generate();
const stream = EventStream.for(Account, accountId); 
// For a multi-tenant setup: EventStream.for(Account, accountId, pool);

stream.streamId; // account-af9a0775-b868-4063-89d8-ccc81bce3c3d

 

Event store

This library provides several types of event store implementations, as described above. It's important to trigger the setup method on a store in order to prepare the database for storing your events, basically what this does is create an events or snapshots table or collection. In a multi-tenant infrastructure, separate event-tables can be created by triggering the setup method with a "pool", which prefixes the table name with the tenant-pool you provided. This pool can then be passed to the event-store when writing or reading events/snapshots.

import { EventStore } from '@ocoda/event-sourcing';

class AppModule implements OnModuleInit {

  constructor(private readonly eventStore: EventStore) {}

  async onModuleInit() {
    await this.eventStore.setup();
  }
}

 

Event envelopes

Events that get stored to a stream are always wrapped in an EventEnvelope. This envelope contains the name of the event as specified with the @Event() decorator, the serialized version of the event and additional metadata. (eventId, aggregateId, version, etc.)

Event publishers

Whenever the EventStore appends events, the produced EventEnvelopes get published by the EventPublishers that are registered in the EventBus. A default EventPublisher takes care of publishing events internally, which allows us to create and register EventHandlers that automatically respond to these events.

@EventHandler(AccountOpenedEvent)
export class AccountOpenedEventHandler implements IEventHandler {
	handle(envelope: EventEnvelope<AccountOpenedEvent>) {
		...
	}
}

To register an additional EventPublisher to push your EventEnvelopes to Redis, SNS, Kafka, etc. simply create one and register it as a provider.

@EventPublisher()
export class CustomEventPublisher implements IEventPublisher {
	async publish(envelope: EventEnvelope<IEvent>): Promise<void> {
		...
	}
}

Snapshots

Snapshots are an optimization that is completely optional. However, they come in handy when event-streams become large and reading them out becomes slow.  

Snapshot streams

The SnapshotStream class creates a representation of a stream of snapshots for a specific aggregate.

const accountId = Id.generate();
const stream = SnapshotStream.for(Account, accountId);

stream.streamId // account-af9a0775-b868-4063-89d8-ccc81bce3c3d

 

Snapshot store

The SnapshotStore saves the state of an aggregate at a certain interval and only fetch the events from that version on. Just as the EventStore it needs to be setup, optionally with a tenant pool. Another advantage of using the snapshot handler is that it also creates a snapshot at version 1 of your aggregate, which makes it easier to get a complete set of aggregates of a certain type in your application.

import { SnapshotStore } from '@ocoda/event-sourcing';

class AppModule implements OnModuleInit {

  constructor(private readonly snapshotStore: SnapshotStore) {}

  async onModuleInit() {
    await this.snapshotStore.setup();
  }
}

 

Snapshot handlers

The store is used behind the scenes of the SnapshotHandler base class, which is responsible for saving and loading snapshots behind the scenes.

How an aggregate snapshot is (de)serialized is the responsibility of a SnapshotHandler which extends the base and is decorated with the @Snapshot() decorator, which specifies:

  • which aggregate it's responsible for
  • the stream name (defaults to the name of the aggregate's class)
  • at which interval a snapshot should be taken
import { SnapshotHandler } from '@ocoda/event-sourcing';

@Snapshot(Account, { name: 'account', interval: 5 })
export class AccountSnapshotHandler extends SnapshotHandler<Account> {
  serialize({ id, ownerIds, balance, openedOn, closedOn }: Account) {
    return {
      id: id.value,
      ownerIds: ownerIds.map(({ value }) => value),
      balance,
      openedOn: openedOn ? openedOn.toISOString() : undefined,
      closedOn: closedOn ? closedOn.toISOString() : undefined,
    };
  }
  deserialize({ id, ownerIds, balance, openedOn, closedOn }: ISnapshot<Account>): Account {
    const account = new Account();
    account.id = AccountId.from(id);
    account.ownerIds = ownerIds.map(AccountOwnerId.from);
    account.balance = balance;
    account.openedOn = openedOn && new Date(openedOn);
    account.closedOn = closedOn && new Date(closedOn);

    return account;
  }
}

 

Aggregate repositories

Aggregate repositories are where both stores meet. For example:

@Injectable()
export class AccountRepository {

  constructor(
    private readonly eventStore: EventStore,
    private readonly accountSnapshotHandler: AccountSnapshotHandler,
  ) {}

  async getById(accountId: AccountId) {
    const eventStream = EventStream.for<Account>(Account, accountId);

    const account = await this.accountSnapshotHandler.load(accountId);

    const events = this.eventStore.getEvents(eventStream, { fromVersion: account.version + 1 });

    await account.loadFromHistory(events);

    if (account.version < 1) {
        throw new AccountNotFoundException(accountId.value);
    }

    return account;
  }

  async save(account: Account): Promise<void> {
    const events = account.commit();
    const stream = EventStream.for<Account>(Account, account.id);

    await Promise.all([
      this.accountSnapshotHandler.save(account.id, account),
      this.eventStore.appendEvents(stream, account.version, events),
    ]);
  }
}

 

Queries

You can create queries to return the data you need.

export class GetAccountQuery {

  constructor(public readonly accountId: string) {}

}

@QueryHandler(GetAccountQuery)
export class GetAccountQueryHandler implements IQueryHandler {

  constructor(private readonly accountRepository: AccountRepository) {}

  async execute(query: GetAccountQuery): Promise<Account> {
    const accountId = AccountId.from(query.accountId);
    const account = await this.accountRepository.getById(accountId);

    return account;
  }
}

 

Misc

  • What about materialized views? Event sourcing articles often suggest to listen to published events to create or update a database view that is optimized for reading. While this offers some advantages, there is a lot of overhead to consider when doing so. An alternative is to simply read out your write models. A very interesting read about the benefits and trade-offs can be found here.

  • What about sagas? At this point, I haven't created Sagas because in basic use cases EventHandlers can take care of triggering side-effects. If the need arises, I'll look into this.  

Contact

dries@drieshooghe.com  

Acknowledgments

This library is inspired by @nestjs/cqrs

About

An event-sourcing library for NestJS

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • TypeScript 99.7%
  • JavaScript 0.3%