Skip to content

Introducing_CQRS_ES_System_OOC2024

Junichi Kato edited this page Mar 24, 2024 · 63 revisions

CQRS/Event Sourcingシステム実装入門

2024/3/24 Object-Oriented Conference 2024

このドキュメントは書きかけです。

プログラム概要

  • ハンズオンの目的/システム及びコードベースの概要説明(30分)
  • 課題の説明と実施(70分)

ハンズオンの目的

CRUDシステムより、複雑なCQRS/Event Sourcingを理解する

FYI: CQRS/ESのPros/Cons

Pros

  • コマンド・クエリそれぞれの特性に分けた最適化が可能
    • クエリ:コマンド = 8:2のトラフィック量に合わせたスケーリングが可能
    • 正規化されたドメインオブジェクトの構造 vs 非正規化されたリードモデルの構造
    • 一般的に、コマンド側は一貫性を重視、クエリ側は可用性を重視する
    • イベントの書き込みは追記のみ(ただしトランザクション管理は必要)
  • 2層コミットを避けてドメインイベントが利用可能
    • データ分析に利用可能
    • ジャーナルさえあれば、そこからどんなデータでも再構築可能(理論上)
    • マイクロサービス間の非同期な連携に利用できる

Cons

  • 単純なCRUDシステムより複雑になる

CQRS/Event Sourcingシステムを実現するには、以下のようなアクターシステムを使った方法論がよく使われます。現状としては、アクターモデルを理解した上で、それぞれのアクターのシャーディングや永続化について学ぶ必要があり、CQRS/Event Sourcingの知識を獲得するまで道のりが遠いという課題があります。(具体的には、アクターのクラスターシステム、クラスターシャーディング、アクターが生成するイベントの永続化とリプレイの仕組みを理解しなければなりません。同時に運用負荷も相当に高くなります)

そこで、j5ik2o/event-store-adapterではアクターモデルではなく既存のオブジェクトモデルを使って、特別なクラスターシステムも不要とし、CQRS/Event Sourcingの知識を比較的容易に獲得できるように設計されています。

ただし、アクターシステムを使う方法と比べて以下のデメリットがあります。

  • リクエストの先頭で、毎回集約(ドメインオブジェクト)のリプレイを実行する必要がある
    • アクターシステムの場合は初回のリクエストのみ発生し、ワークロードがある間はアクターは常駐します。つまりリプレイコストは都度払う必要がない
  • イベントの追記時、ステートレスアプリケーションではDB側でトランザクション+楽観的ロックが必要になる
    • シャーディングされたアクターは常時起動するエンティティであり、そこに到達したコマンドリクエストはアクターのメールボックスに到着した潤に処理されるため、このような仕組みは不要

環境準備

ツールのインストール

リポジトリURLの確認

ビルド及びテストの方法

コードの前提

CQRS/Event Soucingを全体像を把握するためのコードベースなので、以下のような前提があります。

  • テストは網羅的に書かれていません
  • エラーハンドリングも割り切りで書かれている部分があります

とりあえず、IDEで開く

  • JetBrains系の人は
    • Goは、GoLand
    • Rustは、IntelliJ+Rustプラグイン or Rust Rover
    • TypeScriptは、WebStorm

動作確認する

ターミナルからDocker Composeを起動して動作確認してください。手順はそれぞれのGitリポジトリに記載があります。

システム構成

system-layout.png

ユースケース

コマンド側

  • グループチャットを作成する
    • 名前、管理者となるユーザアカウントIDを指定する必要がある
  • グループチャットの名前を変更する
    • グループチャットが削除されていないこと
    • 現在の名前と異なる名前であること
    • 実行者が管理者であること
  • グループチャットにメンバーを追加する
    • グループチャットが削除されていないこと
    • 追加対象のユーザアカウントIDがすでにメンバーでないこと
    • 実行者が管理者であること
  • グループチャットからメンバーを削除する
    • グループチャットが削除されていないこと
    • 追加対象のユーザアカウントIDがすでにメンバーであること
    • 実行者が管理者であること
  • グループチャットにメッセージを投稿する
    • グループチャットが削除されていないこと
    • メッセージの送信者がメンバーであること
  • 【対象課題】 グループチャットに投稿されたメッセージを編集する
    • グループチャットが削除されていないこと
    • 編集対象のメッセージがすでに削除されていないこと
    • メッセージの編集者が当該メッセージの投稿者と同じであること
  • グループチャットに投稿されたメッセージを削除する
    • グループチャットが削除されていないこと
    • 削除対象のメッセージがすでに削除されていないこと
  • グループチャットを削除する
    • グループチャットが削除されていないこと

クエリ側

  • 自分がメンバーとして参加している、グループチャット一覧を取得する
  • 自分がメンバーとして参加している、グループチャットを取得する
  • 自分がメンバーとして参加している、メンバー一覧を取得する
  • 自分がメンバーとして参加している、メンバーを取得する
  • 自分がメンバーとして参加している、メッセージ一覧を取得する
  • 自分がメンバーとして参加している、メッセージを取得する

コンポーネント

アプリケーション

Write API Server
  • 書き込み命令を受け付けるAPIサーバ。ドメインオブジェクト(集約)を使ってドメインロジックを実行することが目的
    • Create GroupChat, Post Messageなど
  • GraphQLサーバとしてMutationを実装
  • ドメインオブジェクトはコマンドを受理すると、ドメインロジックを実行し、状態遷移します。その際にドメインイベントも生成します
  • そのドメインイベントはDynamoDBに追加書き込みされます。ドメインイベントとは別に集約のスナップショットも定期的にDynamoDBに保存されます
Read Model Updater
  • Write API Serverが生成したドメインイベントをDynamoDBから読み出し、リードモデルをMySQL上に生成するコンポーネント
  • MessagePostedイベントが発生すると、Read DB上のMessagesテーブルにMessageリードモデルを構築します。
  • 本番ではAWS Lambdaを想定しているが、ローカルではスタブ上から関数呼び出しする実装
Read API Server
  • 読み込み命令を受け付けるAPIサーバ。MySQL上にあるリードモデルを読み込んでレスポンスを返すことが目的
    • Get GroupChat, Get Messageなど
  • GraphQLサーバとしてQueryを実装

実行環境

  • アプリケーションの配布形式はDockerイメージ
  • AWS環境では、Write API Server, Read API ServerはEKS(Fargate)を想定。Read Model UpdaterはAWS Lambdaを想定。Rust版以外は未対応
  • ローカル環境では、Docker Composeを利用。(Read Model Updaterはローカル用のスタブからLambda用の関数をコールバックしてエミュレーションする)

データベース

  • コマンド側のデータベースはDynamoDBを想定。ローカルではLocal Stackを利用する
  • クエリ側のデータベースはAurora Serverlessを想定。ローカルではMySQLを利用する

コードベースの概要

  • レイヤー構造はクリーンアーキテクチャっぽいものです。
  • 実行環境はDockerのみ

レイヤー構造

レイヤーの概要

  • インフラストラクチャ層
    • 今回の実装ではほぼ空です
    • オニオンアーキテクチャでは、この層がORMやリポジトリの配置場所となりますが、クリーンアーキテクチャではそれらはインターフェイスアダプタのGateway責務となる
    • ここではどのレイヤーからも利用可能でかつ、言語やランタイムを拡張するような責務が配置されます。ロギングやリソース解放ツールなど
    • 依存先のレイヤー: なし
  • コマンド側
    • ドメイン層(command/domain)
      • 依存先のレイヤー: infrastructure
    • コマンドプロセッサ(command/processor)
      • 依存先のレイヤー: command/domain, command/interface-adaptor-if, infrastructure
    • インターフェイスアダプタ
      • 契約(command/interface-adaptor-if)
        • 契約となる仕様部分(インターフェイスのシグニチャやモデルの型など)
        • 契約と実装を分けるのは内殻に位置するコマンドプロセッサにアウトプットポートを提供するため。この構造でなければ循環参照になる
        • 依存先のレイヤー: command/domain
      • 実装(command/interface-adaptor-impl)
        • 契約に対する実装部分
        • 依存先のレイヤー: command/domain, command/interface-adaptor-if, command/processor, infrastructure
  • クエリ側
    • インターフェイスアダプタ(query/interface-adaptor)
      • 依存先のレイヤー: infrastructure
  • 中間
    • Read Model Updaterのリードモデル更新用関数(rmu)
      • 依存先のレイヤー: command/domain, infrastructure

ドメイン層

  • ビジネスロジックを提供するGroupChat集約と、それに関連するドメインオブジェクトが内包されたレイヤーです

  • GroupChat集約はメンバー間でメッセージをやりとりするためのドメインオブジェクトです

  • Go:GroupChat

  • Rust:GroupChat

ユースケース層

  • GroupChat集約とGroupChatリポジトリ(I/F)を結びつけたGroupChatプロセッサによって、アプリケーションに必要なユースケースを提供します。

  • ビジネスロジックそのものではなく、機能の実現のために必要な一連のフロー制御を行います。

  • Go:GroupChatCommandProcessor

  • Rust:GroupChatCommandProcessor

インターフェイスアダプタ層

インフラストラクチャ層

  • 各レイヤーから利用可能な汎用的な技術基盤

ブートスラップ

  • アプリケーションの起動部分

実行環境

Docker

  • Dockerfile
  • Docker Compose構成

課題

課題はsanboxブランチで作業する。

$ git switch sandbox

課題1: メッセージ編集機能の追加

Goの場合

  • make testを実行して全てのテストがパスすることを確認する
  • インターフェイスアダプタ層:コマンド用SDLを編集する
    gqlgenはSDL駆動なのでまずSDLを編集します。コマンド側のSDLファイルに、EditMessageInputeditMessageメソッドを追加する。
input EditMessageInput {
  groupChatId: String!
  content: String!
  executorId: String!
}
// snip
type MutationRoot {
// snip
  postMessage(input: PostMessageInput!): MessageResult!
  editMessage(input: EditMessageInput!): MessageResult!
  deleteMessage(input: DeleteMessageInput!): GroupChatResult!
}
  • schema.resolvers.goを更新する
    以下のコマンドをプロジェクト直下で実行し schema.resolvers.go を更新してください。
$ make c-gql-gen
// snip
func (r *mutationRootResolver) EditMessage(ctx context.Context, input commandgraphql.EditMessageInput) (*commandgraphql.MessageResult, error) {
// snip

答えはこちら → https://github.com/j5ik2o/cqrs-es-example-go/pull/121

Rustの場合

// modules/command/interface-adaptor-impl/src/graphql/inputs.rs
#[derive(Debug, Clone, InputObject)]
pub struct EditMessageInput {
  pub group_chat_id: String,
  pub content: String,
  pub executor_id: String,
}
  • インターフェイスアダプタ層:GraphQL用リゾルバにedit_messageメソッドを追加・実装する
// modules/command/interface-adaptor-impl/src/graphql/resolvers.rs
#[Object]
impl MutationRoot {
// snip
  async fn edit_message<'ctx>(&self, ctx: &Context<'ctx>, input: EditMessageInput) -> FieldResult<MessageOut> {
// 実装する
  }
// snip
}

答えはこちら → https://github.com/j5ik2o/cqrs-es-example-rs/pull/355

課題2: GroupChat集約の最適化

  • GroupChat集約は投稿されたMessageの集合をMessagesとして保持しているが、投稿内容の文字列自体を保持しないようにすることで、集約の大きさを小さくする
  • ただし、イベントには投稿内容が含まれるようにこれまでの構造を維持すること