diff --git a/src/rill/wheel.clj b/src/rill/wheel.clj index bee654b..63fa387 100644 --- a/src/rill/wheel.clj +++ b/src/rill/wheel.clj @@ -331,6 +331,7 @@ " (:refer-clojure :exclude [empty empty? type]) (:require [rill.event-store :refer [retrieve-events append-events]] + [rill.message :as message] [rill.wheel.repository :as repo] [clojure.string :as string] [clojure.spec :as s] @@ -377,6 +378,12 @@ [aggregate] (::new-events aggregate)) +(defn aggregate? + "Test that `obj` is an aggregate." + [obj] + (boolean (and (::id obj) + (::type obj)))) + (defn empty "Create a new aggregate with id `aggregate-id` and no events. Aggregate version will be -1. Note that empty aggregates @@ -586,7 +593,12 @@ [aggregate] (let [events (::new-events aggregate)] {::status :ok - ::events events + ::events (->> events + (map-indexed (fn [index event] + (-> event + (assoc ::message/stream-id (::id aggregate)) + (assoc ::message/number (+ (::version aggregate) (inc index)))))) + vec) ::aggregate (-> aggregate (update ::version + (count events)) (assoc ::new-events []))})) diff --git a/src/rill/wheel/repository.clj b/src/rill/wheel/repository.clj index 412f9a8..36f9517 100644 --- a/src/rill/wheel/repository.clj +++ b/src/rill/wheel/repository.clj @@ -24,4 +24,3 @@ "Test if `repo` is a repository" [repo] (satisfies? Repository repo)) - diff --git a/test/rill/wheel_test.clj b/test/rill/wheel_test.clj index 477ff65..d2d08e6 100644 --- a/test/rill/wheel_test.clj +++ b/test/rill/wheel_test.clj @@ -151,31 +151,38 @@ commit!))) (is (sub? {::aggregate/status :ok - ::aggregate/events [{::message/type ::installed}]} + ::aggregate/events [{::message/type ::installed + ::message/stream-id {:turnstile-id id + ::aggregate/type ::turnstile} + ::message/number 0}]} (-> (get-turnstile repo id) (install-turnstile) commit!))) (is (sub? {::aggregate/status :ok - ::aggregate/events [{::message/type ::arm-pushed-ineffectively}]} + ::aggregate/events [{::message/type ::arm-pushed-ineffectively + ::message/number 1}]} (-> (get-turnstile repo id) (push-arm) commit!))) (is (sub? {::aggregate/status :ok - ::aggregate/events [{::message/type ::coin-inserted}]} + ::aggregate/events [{::message/type ::coin-inserted + ::message/number 2}]} (-> (get-turnstile repo id) (insert-coin) commit!))) (is (sub? {::aggregate/status :ok - ::aggregate/events [{::message/type ::arm-turned}]} + ::aggregate/events [{::message/type ::arm-turned + ::message/number 3}]} (-> (get-turnstile repo id) (push-arm) commit!))) (is (sub? {::aggregate/status :ok - ::aggregate/events [{::message/type ::arm-pushed-ineffectively}]} + ::aggregate/events [{::message/type ::arm-pushed-ineffectively + ::message/number 4}]} (-> (get-turnstile repo id) (push-arm) commit!)))))