Yes is a framework for building event-sourced systems, originally developed to power Switzerland's leading apprenticeship platform yousty.ch and its younger sibling professional.ch. It is designed to be used within Rails applications and relies on PgEventstore for event storage, which provides a robust PostgreSQL-based event store implementation.
- Quick Start
- Naming Conventions
- Aggregate DSL
- Authorization
- Command API
- Read API
- Event Processing
- Configuration Reference
- Development
- Contributing
Add this line to your application's Gemfile:
gem 'yes-core'Then execute:
bundle installAt the core of Yes is the Yes::Core::Aggregate class, which provides a DSL for defining event-sourced aggregates:
module Users
module User
class Aggregate < Yes::Core::Aggregate
# Define attributes with types
attribute :name, :string, command: true # this will generate a change_name command
attribute :email, :email, command: true # this will generate a change_email command
attribute :company_id, :uuid # this will not generate a command, just an accessor
# Define custom commands
command :assign_to_company do
payload company_id: :uuid # Payload keys need to match attributes
guard :company_exists do
CompanyReadModel.exists?(payload.company_id)
end
end
end
end
end
# Usage
user = Users::User::Aggregate.new
user.change_name("John Doe")
user.assign_to_company(company_id: "123e4567-e89b-12d3-a456-426614174000")When defining an aggregate, use the following namespacing pattern:
<Context>::<AggregateName>::Aggregate
For example: Users::User::Aggregate or Companies::Company::Aggregate
The attribute method defines properties of your aggregate:
module Users
module User
class Aggregate < Yes::Core::Aggregate
# Basic attributes without commands
attribute :name, :string
attribute :email, :email
# Attributes with change commands
attribute :age, :integer, command: true
attribute :bio, :string, command: true
end
end
endcommand: true- Generates change commands for the attribute (not generated by default). It is not allowed for:aggregateattribute type.
When command: true is specified, Yes automatically creates:
- Accessor methods (
user.name,user.email) - Change commands (
user.change_age(30)) - Validation methods (
user.can_change_bio?("New bio")) - Event classes for recording changes
Without command: true, only the accessor is created.
The command method defines custom operations on your aggregate:
module Companies
module Company
class Aggregate < Yes::Core::Aggregate
# Define attributes that will be updated by the command
attribute :user_ids, :uuids
command :assign_user do
# Define payload attributes
payload user_id: :uuid
guard :user_not_already_assigned do
!user_ids.include?(payload.user_id)
end
# Custom state update logic
update_state do
user_ids { (user_ids || []) + [payload.user_id] }
end
end
end
end
endAttributes are the core properties of your aggregates.
The attribute system supports various types:
:string- Text values:email- Email addresses with validation:uuid- UUID values:integer- Numeric values:boolean- True/false values:date- Date values:uuids- Arrays of UUIDs
For the complete list, see yes-core/lib/yes/core/type_lookup.rb
You can register application-specific types using the type registry:
# config/initializers/yes_types.rb
Yes::Core::Types.register(:subscription_type, Yes::Core::Types::String.enum('premium', 'basic'))
Yes::Core::Types.register(:team_role, Yes::Core::Types::String.enum('lead', 'member'))
Yes::Core::Types.register(:training_year, Yes::Core::Types::Coercible::Integer.constrained(gteq: 1, lteq: 4))Registered types can then be used in aggregate definitions:
attribute :role, :team_role, command: trueIf you specify command: true when defining an attribute, Yes generates:
Changes the attribute's value through an event:
user.change_age(30)
user.change_bio("Software developer")You can also pass parameters as a hash:
user.change_age(age: 30)Validates a potential change without applying it:
# Valid change
if user.can_change_email?("user@example.com")
user.change_email("user@example.com")
end
# Invalid change
user.can_change_email?("invalid-email") # => false
user.email_change_error # Contains the error messageCommands define operations that can be performed on your aggregate.
Define the input data for your command:
command :register_apprenticeship do
payload title: :string,
start_date: :date,
location_id: :uuid
endMake sure the payload keys are all defined as attributes on the aggregate if you don't supply an update_state block.
Optional and Nullable Attributes
You can mark payload attributes as optional (key can be omitted) or nullable (value can be nil) using hash syntax:
command :update_profile do
# Optional key - attribute can be omitted from payload
payload phone: { type: :string, optional: true },
# Nullable value - attribute must be present but can be nil
max_travel_time: { type: :integer, nullable: true },
# Both optional key and nullable value
email: { type: :email, optional: true, nullable: true }
endoptional: true- The key can be omitted from the command payload (for commands) or event data (for events)nullable: true- The value can benil(wraps the type with.maybefor commands, uses.maybe()for events)
Note: For commands, nullable attributes are automatically unwrapped from Dry::Monads::Maybe::Some/None when accessing command.payload to ensure compatibility with event creation.
Add validation rules with guards:
command :publish do
guard :all_required_fields_present do
title.present? && description.present?
end
guard :not_already_published do
!published
end
endCustomize the generated event name:
command :publish do
event :apprenticeship_published
endWhen no custom event name is provided, Yes automatically generates an event name based on the command name. Currently, only standard command prefixes are supported. If you use a command that doesn't start with a supported prefix, you must specify the event name explicitly. For a list of supported prefixes, see lib/yes/core/utils/event_name_resolver.rb.
Yes supports encrypting sensitive data in events. You can mark payload attributes for encryption using three approaches:
1. Inline Encryption Declaration (Recommended for mixed payloads)
command :update_contact_info do
payload email: { type: :email, encrypt: true },
phone: { type: :phone, encrypt: true },
address: :string # not encrypted
end2. Separate encrypt Method (Recommended for multiple encrypted fields)
command :update_sensitive_data do
payload ssn: :string, email: :email, phone: :phone
encrypt :ssn, :email, :phone
end3. Command Shortcut with encrypt Option
# For simple attribute commands
command :change, :ssn, :string, encrypt: trueImportant Notes:
- Encryption applies to the event payload stored in the event store, not to the aggregate state or read models
- Encrypted attributes are tracked in the generated event class via an
encryption_schemaclass method - You can combine inline and separate encryption declarations in the same command
- The encryption key is automatically derived from the aggregate ID
Define exactly how state should change:
command :add_tag do
payload tag: :string
update_state do
tags { (tags || []) + [payload.tag] }
end
endYou can also use the update_state method to update multiple attributes at once:
update_state do
name { payload.name }
email { payload.email }
endMake sure the attributes updated in the update_state block are all defined on the aggregate.
Commands update the aggregate state in one of two ways:
If you don't define an update_state block, the command will automatically update the aggregate's attributes based on the payload:
module Companies
module Company
class Aggregate < Yes::Core::Aggregate
# Define attributes that match the payload keys
attribute :name, :string
attribute :description, :string
command :update_details do
# Payload keys must match attribute names
payload name: :string,
description: :string
# No update_state block needed - automatic update
end
end
end
end
company = Companies::Company::Aggregate.new
company.update_details(name: "Acme Inc", description: "Manufacturing company")
# Both name and description attributes will be updated automaticallyImportant: When not using an update_state block:
- All payload keys must be defined as attributes on the aggregate
- The system will validate this and raise an error if there's a mismatch
- The attribute values will be updated directly from the payload values
When you define an update_state block, you have complete control over how attributes are updated:
module Articles
module Article
class Aggregate < Yes::Core::Aggregate
attribute :title, :string
attribute :tags, :array
attribute :status, :string
command :publish do
payload title: :string
update_state do
# You can reference payload values
title { payload.title }
# Or set static values
status { "published" }
# Or combine existing data with payload
tags { (tags || []) + ["published"] }
end
end
end
end
endImportant: When using an update_state block:
- Payload keys don't need to match attribute names
- However, all attributes updated in the block must be defined on the aggregate
- The system will validate this and raise an error if an undefined attribute is updated
- You have full control over transformation logic
For each command, Yes generates:
Executes the command:
company.assign_user(user_id: "123e4567-e89b-12d3-a456-426614174000")Validates if the command would succeed:
if company.can_assign_user?(user_id: "123e4567-e89b-12d3-a456-426614174000")
company.assign_user(user_id: "123e4567-e89b-12d3-a456-426614174000")
else
puts company.assign_user_error
endFor the most frequently used cases Yes DSL allows to use shortcuts in command definitions.
command :change, :age, :integer, localized: trueis expanded to
attribute :age, :integer, localized: true
command :change_age do
payload age: :integer, locale: :locale
guard(:no_change) { value_changed?(send(attribute_name), payload.send(attribute_name)) }
endYou can overwrite the default no change guard by providing a custom one:
command :change, :age, :integer do
payload fantastic_new_age: :integer
guard(:no_change) { age != payload.fantastic_new_age }
end:enable and :activate command names are triggering this shortcut.
command :activate, :dropout, attribute: :dropout_enabledis expanded to
attribute :dropout_enabled, :boolean
command :activate_dropout do
guard(:no_change) { !dropout_enabled }
update_state { dropout_enabled { true } }
endcommand [:enable, :disable], :dropoutis expanded to
attribute :dropout, :boolean
command :enable_dropout do
guard(:no_change) { !dropout }
update_state { dropout { true } }
end
command :disable_dropout do
guard(:no_change) { dropout }
update_state { dropout { false } }
endcommand :publishis expanded to
attribute :published, :boolean
command :publish do
guard(:no_change) { !published }
update_state { published { true } }
endGuards are powerful validation mechanisms that enforce business rules by controlling when commands and attribute changes are permitted to execute. They act as gatekeepers that ensure all operations maintain the integrity of your domain logic.
Both commands and attributes automatically include a :no_change guard that ensures the aggregate's state would actually change when applying the command. For commands, this default guard is only active when there is no update_state block present in the command definition.
When defining an attribute with a command, you can add guards to implement validation:
attribute :email, :email, command: true do
guard :check_email_domain do
payload.email.end_with?('@example.com')
end
endSimilarly, you can add guards to commands to control when they can execute:
command :publish do
guard :all_required_fields_present do
title.present? && description.present?
end
guard :not_already_published do
!published
end
endInside any guard block you can access:
payload- The command payload with access to both data and metadata- Any aggregate attribute directly by name
The payload object in guards provides access to command metadata alongside the regular payload data. This metadata can contain useful contextual information like user information, or tracking data.
You can access metadata in two ways:
command :update_status do
payload status: :string
guard :valid_response do
# Method-style access
payload.metadata.response_id.present?
# Hash-style access
payload.metadata[:response_id].present?
end
guard :authorized_user do
# If a metadata key doesn't exist, nil is returned
payload.metadata.user_role == 'admin' # returns nil if user_role is not in metadata
end
endThis allows guards to make decisions based on both the command's data payload and any additional contextual metadata that was provided when the command was issued.
Guards have two distinct behaviors based on their name:
- Guards named
:no_changetrigger a no-change transition error when they fail. This indicates that the operation would not modify the aggregate's state. - All other guard names trigger an invalid transition error when they fail. This indicates that the operation is not allowed in the current state.
command :update_profile do
payload bio: :string
# Will trigger a no-change transition error if bio hasn't changed
guard :no_change do
payload.bio != bio
end
# Will trigger an invalid transition error if bio contains prohibited words
guard :appropriate_content do
!payload.bio.include?("prohibited content")
end
endYou can provide custom localized error messages for guards using I18n translation files:
# config/locales/en.yml
en:
aggregates:
test: # context
apprenticeship: # aggregate
commands:
change_location: # command
guards:
location_published: # guard
error: "Location is not published"
company_matches:
error: "Location company does not match apprenticeship company"This allows you to define human-readable error messages that can be easily translated to different languages. These messages will be used instead of the default error messages when a guard fails.
Each aggregate automatically gets a corresponding read model (ActiveRecord model) that persists its current state. This is how you access attribute values from an aggregate.
user = Users::User::Aggregate.new
user.change_name("Jane Doe")
user.name # => "Jane Doe" (reads from the read model)By default, the read model's name is derived from the aggregate's context and name:
# For Users::User::Aggregate
# The read model class will be UsersUser
# And the database table will be users_usersYou can customize the read model name and visibility using the read_model method:
module Users
module User
class Aggregate < Yes::Core::Aggregate
# Use a custom read model name
read_model 'custom_user', public: false
attribute :email, :email, command: true
attribute :name, :string
end
end
endIn this example:
- The read model class will be
CustomUserinstead ofUsersUser - The database table will be
custom_users public: falsemeans this read model won't be accessible via the read API
When you add or remove aggregates or attributes, you need to update your database schema. Yes provides a Rails generator for this:
rails generate yes:core:read_models:updateThis will:
- Find all aggregates in your application
- Create migration files that update read model tables to match your aggregate definitions
- Add, modify, or remove columns as needed
Example generated migration:
class UpdateReadModels < ActiveRecord::Migration[7.1]
def change
create_table :users do |t|
t.string :name
t.string :email
t.integer :age
t.integer :revision, null: false, default: -1
t.timestamps
end
add_column :companies, :name, :string
remove_column :companies, :old_field
end
endAttribute types are mapped to database column types as follows:
:string,:email,:url→:string:integer→:integer:uuid→:uuid:boolean→:boolean:hash→:jsonb:aggregate→:uuid(stored as<attribute_name>_id)
To ensure read model consistency and enable recovery from failures during event processing, Yes provides a generator that adds pending update tracking to your read models:
rails generate yes:core:read_models:add_pending_update_trackingThis generator creates a migration that:
- Adds a
pending_update_sincecolumn to all read model tables - Creates indexes to efficiently track and recover stale pending updates
- Automatically handles PostgreSQL's 63-character index name limit by truncating long names
The pending update tracking system helps prevent read models from getting stuck in an inconsistent state by:
- Marking read models as "pending" before event publication
- Clearing the pending state after successful updates
- Allowing automatic recovery of stale pending states (default timeout: 5 minutes)
class AddPendingUpdateTrackingToReadModels < ActiveRecord::Migration[7.1]
def up
read_model_tables = Yes::Core.configuration.all_read_model_table_names
read_model_tables.each do |table_name|
next unless ActiveRecord::Base.connection.table_exists?(table_name)
add_column table_name, :pending_update_since, :datetime
# Unique index to prevent concurrent updates to same aggregate
add_index table_name, :id,
unique: true,
where: 'pending_update_since IS NOT NULL',
name: truncate_index_name("idx_#{table_name}_one_pending_per_aggregate")
# Index for efficient recovery queries
add_index table_name, :pending_update_since,
where: 'pending_update_since IS NOT NULL',
name: truncate_index_name("idx_#{table_name}_pending_recovery")
end
end
endYou can schedule a background job to automatically recover stale pending updates:
# app/jobs/read_model_recovery_job.rb
class ReadModelRecoveryJob < ApplicationJob
def perform
Yes::Core::Jobs::ReadModelRecoveryJob.new.perform
end
end
# Schedule it to run periodically (e.g., every 5 minutes)
# In your scheduler (whenever, sidekiq-cron, etc.):
ReadModelRecoveryJob.perform_laterYou can also manually trigger recovery for specific read models:
# Recover a specific read model instance
read_model = UserReadModel.find(id)
Yes::Core::CommandHandling::ReadModelRecoveryService.recover(read_model)
# Recover all stale pending updates (older than 5 minutes by default)
Yes::Core::CommandHandling::ReadModelRecoveryService.recover_all_staleLink aggregates in a hierarchy:
module Companies
module Location
class Aggregate < Yes::Core::Aggregate
parent :company
attribute :name, :string, command: true
attribute :address, :string, command: true
end
end
endThe parent method defines an assign command with its attribute by default.
For the above example it will be assign_company with company_id attribute.
Set parent command option to false to skip defining assign command:
parent :company, command: falseSpecify the main context:
module Users
module User
class Aggregate < Yes::Core::Aggregate
primary_context :users
attribute :name, :string, command: true
end
end
endDefine a default removal behavior for an aggregate:
module Users
module User
class Aggregate < Yes::Core::Aggregate
removable
end
end
endIt defines a remove command which works with the removed_at attribute by default and
applies a default removal behavior.
The removable method accepts a custom name for an attribute which will also be used for
the removal behavior. You can see an example below.
module Users
module User
class Aggregate < Yes::Core::Aggregate
removable(attr_name: :deleted_at)
end
end
endYou can also define additional guards or custom behavior:
module Users
module User
class Aggregate < Yes::Core::Aggregate
removable do
guard(:published) { published? }
end
end
end
endThe draftable feature allows aggregates to be created and modified in a draft state before being published. This is useful when you want to prepare changes without immediately making them live.
module Articles
module Article
class Aggregate < Yes::Core::Aggregate
# Makes aggregate draftable by connecting it to a draft aggregate for managing the draft state.
# The draft aggregate has to exist already. The default draft aggregate is <CurrentAggregateContext>::<CurrentAggregateName>Draft.
# Also configures a changes read model (defaults to "<read_model>_change")
draftable
# Draftable with custom parameters
# draftable draft_aggregate: { context: 'ArticleDrafts', aggregate: 'ArticleDraft' }, changes_read_model: :article_change
attribute :title, :string, command: true
attribute :content, :string, command: true
end
end
endThe draftable method accepts two optional parameters:
draft_aggregate: A hash containing the draft aggregate configurationcontext: The context name for the draft version (defaults to the same context as the main aggregate)aggregate: The aggregate name for the draft version (defaults to the main aggregate name with "Draft" suffix)
changes_read_model: The name for the changes read model (defaults to the main read model name with "_change" appended)
# Use all defaults
draftable
# Custom context only
draftable draft_aggregate: { context: 'DraftContext' }
# Custom aggregate name only
draftable draft_aggregate: { aggregate: 'MyDraft' }
# Both context and aggregate
draftable draft_aggregate: { context: 'DraftContext', aggregate: 'MyDraft' }
# Custom changes read model only
draftable changes_read_model: :custom_changes
# All custom parameters
draftable draft_aggregate: { context: 'DraftContext', aggregate: 'MyDraft' }, changes_read_model: :my_changesWhen changes_read_model is not specified, it defaults to using the main read model name with "_change" appended (e.g., if the read model is "article", the changes read model becomes "article_change").
Both the Command API and Read API delegate authentication to a configurable adapter. Configure it in an initializer:
# config/initializers/yes.rb
Yes::Core.configure do |config|
config.auth_adapter = MyAuthAdapter.new
endThe adapter must implement three methods:
| Method | Purpose | Called by |
|---|---|---|
authenticate(request) |
Verify the JWT token and return an auth data hash. Raise a Yes::Core::AuthenticationError subclass on failure. |
Both API controllers (before every request) |
verify_token(token) |
Decode a raw JWT token string. Return an object responding to .token that returns [decoded_payload_hash]. |
MessageBus user identification |
error_classes |
Return an array of exception classes that represent authentication failures. | Command API controller (to rescue and render 401) |
- On every request, the controller calls
adapter.authenticate(request). - The returned hash is stored as
auth_dataand passed to command authorizers, read request authorizers, and read model authorizers throughout the request lifecycle. - The hash must include at minimum an
:identity_idkey, which is used for command metadata, MessageBus channel defaults, and authorization.
class MyAuthAdapter
AuthError = Class.new(Yes::Core::AuthenticationError)
# @param request [ActionDispatch::Request]
# @raise [AuthError] if the token is missing or invalid
# @return [Hash] auth data passed to authorizers as auth_data
def authenticate(request)
token = request.headers['Authorization']&.delete_prefix('Bearer ')
raise AuthError, 'Token missing' unless token
payload = JWT.decode(token, public_key, true, algorithm: 'RS256').first
{ identity_id: payload['sub'], host: request.host }.merge(payload.symbolize_keys)
end
# @param token [String] raw JWT token (extracted from Authorization header)
# @return [OpenStruct] object with .token returning [decoded_payload_hash]
def verify_token(token)
decoded = JWT.decode(token, public_key, true, algorithm: 'RS256')
OpenStruct.new(token: decoded)
end
# @return [Array<Class>] exception classes the controller rescues as 401
def error_classes
[AuthError, JWT::DecodeError]
end
private
def public_key
OpenSSL::PKey::RSA.new(ENV.fetch('JWT_PUBLIC_KEY'))
end
endTo make aggregates available via the command API, you must define an authorization scheme at the aggregate level. This controls who can execute commands on the aggregate.
The simplest authorization simply allows all commands to be executed:
module Users
module User
class Aggregate < Yes::Core::Aggregate
# Allow all commands
authorize do
true
end
attribute :name, :string, command: true
end
end
endInside the authorize block, you can access:
command- The command being executedauth_data- The decoded data from the JWT authentication token
This allows for custom authorization logic:
authorize do
# Only allow commands if the authenticated identity matches the user
command.user_id == auth_data[:identity_id]
endCommands can define per-command authorization that extends or overrides the aggregate-level authorizer.
# First define an aggregate level authorizer
class Aggregate < Yes::Core::Aggregate
authorize do
# Base level authorization logic
auth_data[:identity_id].present?
end
# Then add command-specific refinements
command :publish do
payload user_id: :uuid
# Command-specific authorization logic
authorize do
# Has access to the command and auth_data
command.user_id == auth_data[:user_id]
end
end
endWhen an aggregate has declared authorize at the class level, commands can define their own
authorization logic that inherits from the aggregate-level authorizer. Each command with an
authorize block automatically receives its own Authorizer subclass that inherits from
the aggregate-level authorizer.
Command authorizers are registered in the configuration and can be retrieved with:
Yes::Core.configuration.aggregate_class('Context', 'Aggregate', :publish, :authorizer)For more complex authorization needs, Yes integrates with Cerbos, a powerful authorization engine:
module Users
module User
class Aggregate < Yes::Core::Aggregate
authorize cerbos: true
attribute :name, :string, command: true
end
end
endWhen using Cerbos, you can specify additional parameters:
read_model_class- The class used to load the read model for authorization checks (defaults to the aggregate's read model)resource_name- The resource name used in Cerbos policies (defaults to the underscored aggregate name)
module Companies
module CompanySettings
class Aggregate < Yes::Core::Aggregate
# Custom read model and resource name
authorize cerbos: true,
read_model_class: CustomCompanySettings,
resource_name: 'company_settings'
attribute :name, :string, command: true
end
end
endWhen using custom read models with Cerbos, the model must implement an auth_attributes method that returns a hash of attributes for authorization:
class CustomCompanySettings < ApplicationRecord
def auth_attributes
{ company_id: company_id || '' }
end
endThese attributes are passed to Cerbos for making authorization decisions based on your policies.
For advanced use cases, you can customize how Yes interacts with Cerbos by overriding the resource_attributes and cerbos_payload methods in your authorization block. Currently, this customization is only available within command-level authorization blocks, not at the aggregate level:
module Universe
module Star
class Aggregate < Yes::Core::Aggregate
# Base aggregate-level Cerbos authorization
authorize cerbos: true
attribute :name, :string, command: true
# Command with customized Cerbos integration
command :update_details do
payload details: :string
# Command-level authorization with custom Cerbos integration
authorize do
# Override resource attributes sent to Cerbos
resource_attributes { { owner_id: 'test-user-id' } }
# Override the entire Cerbos payload
cerbos_payload { { principal: auth_data, resource_id: 'test-id' } }
end
end
end
end
endInside the resource_attributes block, you can access:
command- The command being executedresource- The read model instance for the aggregate
Inside the cerbos_payload block, you can access:
command- The command being executedresource- The read model instance for the aggregateauth_data- The decoded data from the JWT authentication token
These blocks allow you to precisely control what data is sent to Cerbos for authorization decisions on a per-command basis.
The Command API (yes-command-api) provides an HTTP endpoint for executing commands as JSON batches. It is a standalone Rails engine that does not depend on the aggregate DSL — it works with any command class that follows one of the supported naming conventions.
Add the gem and mount the engine:
# Gemfile
gem 'yes-command-api'# config/routes.rb
mount Yes::Command::Api::Engine => '/v1/commands'Send a POST request with a JSON body containing a commands array:
{
"commands": [
{
"context": "Users",
"subject": "User",
"command": "ChangeName",
"data": {
"user_id": "47330036-7246-40b4-a3c7-7038df508774",
"name": "Jane Doe"
},
"metadata": {}
}
],
"channel": "my-notifications"
}Each command requires context, subject, command, and data. The optional channel parameter controls which MessageBus channel receives notifications (defaults to the authenticated user's identity_id).
Set async=true or async=false as a query parameter to override the default processing mode (Yes::Core.configuration.process_commands_inline).
The deserializer resolves command classes by trying three naming conventions in order:
| Priority | Convention | Class pattern | Typical use |
|---|---|---|---|
| 1 | Command Group | CommandGroups::<Command>::Command |
Composed commands |
| 2 | V2 | <Context>::<Subject>::Commands::<Command>::Command |
DSL-generated commands |
| 3 | V1 | <Context>::Commands::<Subject>::<Command> |
Manually created commands |
The first matching constant wins. This means you can use the API with DSL-generated commands, manually created commands, or both.
When a request arrives, it passes through these stages:
- Authentication — the auth adapter verifies the JWT token
- Params validation — checks that each command hash contains
context,subject,command, anddata - Deserialization — resolves class names and instantiates command objects
- Expansion — flattens command groups into individual commands
- Authorization — each command's authorizer is looked up and called with
auth_data - Validation — optional per-command validators are called
- Command bus — commands are dispatched (inline or via ActiveJob)
You can create command classes manually and use them with the Command API. A complete command requires four parts: a command, a handler, an event, and an authorizer. The file structure follows a convention:
app/contexts/
billing/
invoice/
commands/
authorizer.rb # shared base authorizer (optional)
create/
command.rb # command definition
handler.rb # command handler
authorizer.rb # per-command authorizer
events/
created.rb # event definition
Defines the payload attributes and identifies the aggregate:
# app/contexts/billing/invoice/commands/create/command.rb
module Billing
module Invoice
module Commands
module Create
class Command < Yes::Core::Command
attribute :invoice_id, Yes::Core::Types::UUID
attribute :amount, Yes::Core::Types::Integer
attribute :currency, Yes::Core::Types::String
alias aggregate_id invoice_id
end
end
end
end
endProcesses the command and publishes the event. The handler inherits from Yes::Core::Commands::Stateless::Handler and declares which event to emit:
# app/contexts/billing/invoice/commands/create/handler.rb
module Billing
module Invoice
module Commands
module Create
class Handler < Yes::Core::Commands::Stateless::Handler
self.event_name = 'Created'
def call
# Add guard logic here, e.g.:
# no_change_transition('Already exists') if already_exists?
super # publishes the event
end
end
end
end
end
endDefines the event schema for validation when writing to the event store:
# app/contexts/billing/invoice/events/created.rb
module Billing
module Invoice
module Events
class Created < Yes::Core::Event
def schema
Dry::Schema.Params do
required(:invoice_id).value(Yes::Core::Types::UUID)
required(:amount).value(:integer)
required(:currency).value(:string)
end
end
end
end
end
endControls who can execute the command. You can define a shared base authorizer for the aggregate and inherit from it:
# app/contexts/billing/invoice/commands/authorizer.rb
module Billing
module Invoice
module Commands
class Authorizer < Yes::Core::Authorization::CommandAuthorizer
def self.call(_command, auth_data)
raise CommandNotAuthorized, 'Not allowed' unless auth_data[:identity_id].present?
end
end
end
end
end
# app/contexts/billing/invoice/commands/create/authorizer.rb
module Billing
module Invoice
module Commands
module Create
class Authorizer < Billing::Invoice::Commands::Authorizer
# Inherits base authorization; add command-specific checks here
end
end
end
end
endThis command can then be executed via the API:
{
"context": "Billing",
"subject": "Invoice",
"command": "Create",
"data": {
"invoice_id": "550e8400-e29b-41d4-a716-446655440000",
"amount": 10000,
"currency": "CHF"
}
}For performance and reliability, WebSocket-based notifications are the preferred way to inform frontends about command execution status. The Command API ships with two built-in notifiers and supports custom implementations.
Notifiers are configured globally and broadcast three event types per command batch:
| Event | When | Payload includes |
|---|---|---|
batch_started |
Before processing begins | batch_id, commands list |
| Per-command response | After each command completes | Command result or error |
batch_finished |
After all commands complete | batch_id, failed commands (if any) |
Register one or more notifier classes in the initializer:
Yes::Core.configure do |config|
config.command_notifier_classes = [
Yes::Command::Api::Commands::Notifiers::ActionCable,
Yes::Command::Api::Commands::Notifiers::MessageBus
]
endThe channel parameter from the API request (or the authenticated user's identity_id as fallback) is passed to each notifier, so clients only receive notifications for their own commands.
Broadcasts notifications via ActionCable.server.broadcast. This is well suited for use with a dedicated WebSocket gateway service that connects to the same Redis backend:
config.command_notifier_classes = [Yes::Command::Api::Commands::Notifiers::ActionCable]The frontend subscribes to the channel and receives JSON messages:
{ "type": "batch_started", "batch_id": "abc-123", "published_at": 1711540800, "commands": [...] }
{ "type": "batch_finished", "batch_id": "abc-123", "published_at": 1711540801, "failed_commands": [] }Uses the MessageBus gem for long-polling or WebSocket delivery. Messages are scoped to the authenticated user via user_ids:
config.command_notifier_classes = [Yes::Command::Api::Commands::Notifiers::MessageBus]The auth adapter's verify_token method is used by MessageBus to identify subscribers by their identity_id.
You can implement your own notifier by subclassing Yes::Core::Commands::Notifier:
class SlackNotifier < Yes::Core::Commands::Notifier
def notify_batch_started(batch_id, transaction = nil, commands = nil)
# ...
end
def notify_batch_finished(batch_id, transaction = nil, responses = nil)
# ...
end
def notify_command_response(cmd_response)
# ...
end
endThe Read API (yes-read-api) provides an HTTP endpoint for querying read models with filtering, pagination, and authorization. Like the Command API, it is a standalone Rails engine that does not depend on the aggregate DSL — it works with any ActiveRecord model that has a matching serializer.
Add the gem and mount the engine:
# Gemfile
gem 'yes-read-api'# config/routes.rb
mount Yes::Read::Api::Engine => '/queries'Send a GET request with the read model name as the path and optional query parameters:
GET /queries/users?filters[ids]=1,2,3&order[name]=asc&page[number]=1&page[size]=20&include=company
filters[<key>]— filter by attribute (handled by the model's filter class)order[<key>]— sort direction (ascordesc)page[number]andpage[size]— paginationinclude— comma-separated list of associations to include in the response
Send a POST request for complex filtering with AND/OR logic:
{
"model": "users",
"filter_definition": {
"type": "filter_set",
"logical_operator": "and",
"filters": [
{
"type": "filter",
"attribute": "name",
"operator": "is",
"value": "Jane"
},
{
"type": "filter",
"attribute": "status",
"operator": "is_not",
"value": "archived"
}
]
},
"order": { "name": "asc" },
"page": { "number": 1, "size": 20 }
}Filters are optional per-model classes that define available filter scopes. If no custom filter exists, the base Yes::Core::ReadModel::Filter is used.
module ReadModels
module User
class Filter < Yes::Core::ReadModel::Filter
has_scope :name do |_controller, scope, value|
scope.where(name: value)
end
has_scope :ids do |_controller, scope, value|
scope.where(id: value.split(','))
end
private
def read_model_class
::UserReadModel
end
end
end
endThe Read API enforces two levels of authorization:
- Request authorizer — controls whether a user can query a given model at all. Looked up as
ReadModels::<Model>::RequestAuthorizer.
module ReadModels
module User
class RequestAuthorizer
def self.call(filter_options, auth_data)
unless auth_data[:identity_id].present?
raise Yes::Core::Authorization::ReadRequestAuthorizer::NotAuthorized, 'Not allowed'
end
end
end
end
end- Read model authorizer — filters returned records based on what the user can access. Configured via
Yes::Core::Authorization::ReadModelsAuthorizer.
Each read model requires a serializer class following the convention ReadModels::<Model>::Serializers::<Model>. The serializer receives auth_data and filter options, allowing it to customize the response based on the authenticated user.
Yes wraps PgEventstore subscriptions for processing events in real-time.
# lib/tasks/eventstore.rb
subscriptions = Yes::Core::Subscriptions.new
subscriptions.subscribe_to_all(
MyReadModel::Builder.new,
{ event_types: ['MyContext::SomethingHappened', 'MyContext::SomethingElseHappened'] }
)
subscriptions.startStart subscriptions via the PgEventstore CLI:
bundle exec pg-eventstore subscriptions start -r ./lib/tasks/eventstore.rbConfigure a heartbeat URL for monitoring subscription health:
Yes::Core.configure do |config|
config.subscriptions_heartbeat_url = ENV['SUBSCRIPTIONS_HEARTBEAT_URL']
config.subscriptions_heartbeat_interval = 30 # seconds
endProcess managers coordinate commands across services via HTTP.
Sends commands to another service's command API:
client = Yes::Core::ProcessManagers::ServiceClient.new('media')
# Resolves to MEDIA_SERVICE_URL env var or http://media-cluster-ip-service:3000
client.call(access_token: token, commands_data: [...], channel: '/notifications')Base class for process managers that publish commands to external services:
class MyProcessManager < Yes::Core::ProcessManagers::CommandRunner
def call(event)
publish(
client_id: ENV['MY_CLIENT_ID'],
client_secret: ENV['MY_CLIENT_SECRET'],
commands_data: build_commands(event)
)
end
endReconstructs entity state from events for use in process managers:
class UserState < Yes::Core::ProcessManagers::State
RELEVANT_EVENTS = ['Auth::UserCreated', 'Auth::UserNameChanged'].freeze
attr_reader :name
private
def stream
PgEventstore::Stream.new(context: 'Auth', stream_name: 'User', stream_id: @id)
end
def required_attributes
[:name]
end
def apply_user_name_changed(event)
@name = event.data['name']
end
end
state = UserState.load(user_id)
state.valid? # true if all required_attributes are presentYes::Core.configure do |config|
# Command processing
config.process_commands_inline = true # Process commands synchronously (default: true)
config.command_notifier_classes = [] # Array of notifier classes for command batch notifications
# Authentication
config.auth_adapter = nil # Auth adapter instance (required for command/read APIs)
# Cerbos Authorization
config.cerbos_url = ENV['CERBOS_URL'] # Cerbos server URL (default from env var)
config.cerbos_principal_data_builder = -> {} # Lambda to build Cerbos principal data for commands
config.cerbos_read_principal_data_builder = nil # Lambda for read requests (falls back to above)
config.cerbos_commands_authorizer_include_metadata = false
config.cerbos_read_authorizer_include_metadata = false
config.cerbos_read_authorizer_actions = %w[read]
config.cerbos_read_authorizer_resource_id_prefix = 'read-'
config.cerbos_read_authorizer_principal_anonymous_id = 'anonymous'
config.super_admin_check = ->(_auth_data) { false }
# Subscriptions
config.subscriptions_heartbeat_url = nil # URL to ping for subscription health monitoring
config.subscriptions_heartbeat_interval = 30 # Heartbeat interval in seconds
# Observability
config.otl_tracer = nil # OpenTelemetry tracer instance
config.logger = Rails.logger # Logger instance
# Error reporting
config.error_reporter = nil # Callable for error reporting (e.g. Sentry)
endAfter checking out the repo, run bin/setup to install dependencies.
Start PG EventStore using Docker:
docker compose upSetup databases:
./bin/setup_dbEnter a development console (from a gem's spec/dummy directory):
bundle exec rails cuser = Test::User::Aggregate.new
user.change_name(name: "John Doe")
user.name # => "John Doe"
TestUser.last.name # => "John Doe"The dummy app includes mounted command and read APIs for testing. Start the server from one of the gem dummy apps:
cd yes-core/spec/dummy
bundle exec rails sThe dummy app uses a simple Base64-encoded auth adapter for development. Generate a token:
require 'base64'
user_id = "47330036-7246-40b4-a3c7-7038df508774"
token = Base64.strict_encode64({ identity_id: user_id, user_id: user_id }.to_json)Or from the command line:
TOKEN=$(echo -n '{"identity_id":"47330036-7246-40b4-a3c7-7038df508774","user_id":"47330036-7246-40b4-a3c7-7038df508774"}' | base64)Execute a command with curl:
curl --location 'http://127.0.0.1:3000/commands' \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer $TOKEN" \
--data '{
"commands": [{
"subject": "User",
"context": "Test",
"command": "ChangeName",
"data": {
"user_id": "47330036-7246-40b4-a3c7-7038df508774",
"name": "Judydoody Doodle"
}
}],
"channel": "test-notifications"
}'Query the read models:
curl --location 'http://127.0.0.1:3000/queries/test_users' \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer $TOKEN"Each gem has its own test suite that runs in isolation with its own bundle context.
Run specs for a single gem:
rake yes_core:spec
rake yes_command_api:spec
rake yes_read_api:specRun specs for all gems:
rake specYou can also run specs directly from within a gem directory:
cd yes-core && bundle exec rspec specInstall the gem locally:
bundle exec rake installRelease a new version:
- Update the version in
version.rb - Run:
bundle exec rake releaseThis creates a git tag, pushes commits and tags, and pushes the gem to rubygems.org.
Bug reports and pull requests are welcome on GitHub at https://github.com/yousty/yes. See CONTRIBUTING.md for development setup and guidelines.
See CHANGELOG.md for a list of changes.
The gem is available as open source under the terms of the MIT License.