Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Simplified a couple of things. Made the event store a single table.
  • Loading branch information
jletroui committed Nov 6, 2011
1 parent 6a2664d commit 0568dc1
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 33 deletions.
6 changes: 5 additions & 1 deletion DDDPart6/Infrastructure.Impl/ContainerCommandBus.cs
Expand Up @@ -11,10 +11,12 @@ namespace Infrastructure.Impl
public class ContainerCommandBus : ICommandBus
{
private IContainer container;
private IPersistenceManager persistenceManager;

public ContainerCommandBus(IContainer container)
public ContainerCommandBus(IContainer container, IPersistenceManager persistenceManager)
{
this.container = container;
this.persistenceManager = persistenceManager;
}

public void Send<T>(T cmd) where T : ICommand
Expand All @@ -27,6 +29,8 @@ public ContainerCommandBus(IContainer container)
try
{
handler.Handle(cmd);
// Trigger persistence and concurrency checking.
persistenceManager.Commit();
handled = true;
}
catch (ConcurrencyException)
Expand Down
37 changes: 12 additions & 25 deletions DDDPart6/Infrastructure.Impl/SqlServerEventStore.cs
Expand Up @@ -20,40 +20,27 @@ public SqlServerEventStore(IPersistenceManager persistenceManager)
this.persistenceManager = persistenceManager;
}

private const int UniqueKeyViolation = 2627;

public void PersistUncommitedEvents(IAggregateRoot aggregate)
{
persistenceManager.ExecuteNonQuery(
"INSERT INTO [Events] (Id, aggregate_id, version, data) VALUES (@Id, @AggregateId, @Version, @Data)",
new
{
Id = Guid.NewGuid(),
Version = aggregate.Version + 1,
AggregateId = aggregate.Id,
Data = Serialize(aggregate.UncommitedEvents)
});

if (aggregate.Version == 0)
try
{
persistenceManager.ExecuteNonQuery(
"INSERT INTO [Aggregates] (aggregate_id, version) VALUES (@AggregateId, 1)",
"INSERT INTO [Events] (Id, aggregate_id, version, data) VALUES (@Id, @AggregateId, @Version, @Data)",
new
{
AggregateId = aggregate.Id
Id = Guid.NewGuid(),
Version = aggregate.Version + 1,
AggregateId = aggregate.Id,
Data = Serialize(aggregate.UncommitedEvents)
});
}
else
catch (SqlException se)
{
var rowCount = persistenceManager.ExecuteNonQuery(
"UPDATE [Aggregates] SET Version = @Version WHERE aggregate_id = @AggregateId AND Version = @Expected",
new
{
AggregateId = aggregate.Id,
Version = aggregate.Version + 1,
Expected = aggregate.Version
});

// The version evolved since we performed the command, so we have to retry the command.
if (rowCount != 1) throw new ConcurrencyException();
// Thanks Jonathan Oliver's CQRS Event Store
if (se.Number == UniqueKeyViolation) throw new ConcurrencyException();
throw;
}
}

Expand Down
Expand Up @@ -96,6 +96,7 @@ public void Commit()
catch (ConcurrencyException)
{
tx.Rollback();
context[TRANSACTION_KEY] = null;
throw;
}

Expand Down
2 changes: 1 addition & 1 deletion DDDPart6/Presentation/Views/Shared/Site.Master
Expand Up @@ -16,7 +16,7 @@

<div id="header">
<div id="title">
<h1>Migrating to Event Sourcing - Part 1</h1>
<h1>Migrating to Event Sourcing - Part 6-7-8</h1>
</div>

<div id="menucontainer">
Expand Down
2 changes: 1 addition & 1 deletion DDDPart6/Presentation/Web.config
Expand Up @@ -4,7 +4,7 @@
<add name="maindb" connectionString="Data Source=localhost\SQLEXPRESS;Database=DDDPart6;Integrated Security=SSPI;"/>
</connectionStrings>
<appSettings>
<add key="ReCreateSchemaAtStartup" value="false"/>
<add key="ReCreateSchemaAtStartup" value="true"/>
</appSettings>
<system.web>
<compilation debug="true" targetFramework="4.0">
Expand Down
12 changes: 7 additions & 5 deletions DDDPart6/Presentation/schema.sql
@@ -1,21 +1,23 @@
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[failedClasses_to_Student]') AND OBJECTPROPERTY(id, N'ISFOREIGNKEY') = 1) ALTER TABLE [dbo].[failedClasses] DROP CONSTRAINT failedClasses_to_Student
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[Events]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[Events]

IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[failedClasses_to_Student]') AND OBJECTPROPERTY(id, N'ISFOREIGNKEY') = 1) ALTER TABLE [dbo].[failedClasses] DROP CONSTRAINT failedClasses_to_Student
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[passedClasses_to_Student]') AND OBJECTPROPERTY(id, N'ISFOREIGNKEY') = 1) ALTER TABLE [dbo].[passedClasses] DROP CONSTRAINT passedClasses_to_Student
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[Registration_to_Student]') AND OBJECTPROPERTY(id, N'ISFOREIGNKEY') = 1) ALTER TABLE [dbo].[Registration] DROP CONSTRAINT Registration_to_Student
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[Events]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[Events]
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[Aggregates]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[Aggregates]

IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[Class]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[Class]
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[failedClasses]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[failedClasses]
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[passedClasses]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[passedClasses]
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[Student]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[Student]
IF EXISTS (SELECT * FROM dbo.sysobjects WHERE id = OBJECT_ID(N'[dbo].[Registration]') AND OBJECTPROPERTY(id, N'IsUserTable') = 1) drop table [dbo].[Registration]

CREATE TABLE [dbo].[Events] ( [Id] uniqueidentifier NOT NULL, aggregate_id uniqueidentifier NOT NULL, [version] int NOT NULL, [data] varbinary(4000) NOT NULL, PRIMARY KEY CLUSTERED ([Id]) ON [PRIMARY]) ON [PRIMARY]
CREATE TABLE [dbo].[Aggregates] ( [aggregate_id] uniqueidentifier NOT NULL, [version] int NOT NULL, PRIMARY KEY CLUSTERED ([aggregate_id]) ON [PRIMARY]) ON [PRIMARY]
CREATE TABLE [dbo].[Events] ( [Id] uniqueidentifier NOT NULL, aggregate_id uniqueidentifier NOT NULL, [version] int NOT NULL, [data] varbinary(4000) NOT NULL, PRIMARY KEY ([Id]) ON [PRIMARY], UNIQUE ([aggregate_id], [version]) ON [PRIMARY]) ON [PRIMARY]

CREATE TABLE [dbo].[Class]([Id] [uniqueidentifier] NOT NULL,[version] [datetime] NOT NULL,[name] [nvarchar](255) NULL,[credits] [int] NULL, PRIMARY KEY CLUSTERED ([Id]) ON [PRIMARY]) ON [PRIMARY]
CREATE TABLE [dbo].[failedClasses]([StudentFailed_Id] [uniqueidentifier] NOT NULL,[elt] [uniqueidentifier] NULL) ON [PRIMARY]
CREATE TABLE [dbo].[Student]([Id] [uniqueidentifier] NOT NULL,[version] [datetime] NOT NULL,[firstName] [nvarchar](255) NULL,[lastName] [nvarchar](255) NULL,[hasGraduated] [bit] NULL,[credits] [int] NULL,[registrationSequence] [int] NULL, PRIMARY KEY CLUSTERED ([Id] ASC) ON [PRIMARY]) ON [PRIMARY]
CREATE TABLE [dbo].[passedClasses]([StudentPassed_Id] [uniqueidentifier] NOT NULL, [elt] [uniqueidentifier] NULL) ON [PRIMARY]
CREATE TABLE [dbo].[Registration]([aggregateRoot] [uniqueidentifier] NOT NULL,[Id] [int] NOT NULL,[version] [datetime] NOT NULL,[classId] [uniqueidentifier] NULL,[classCredits] [int] NULL, PRIMARY KEY CLUSTERED ([aggregateRoot] ASC, [Id] ASC) ON [PRIMARY]) ON [PRIMARY]

ALTER TABLE [dbo].[failedClasses] ADD CONSTRAINT [failedClasses_to_Student] FOREIGN KEY([StudentFailed_Id]) REFERENCES [dbo].[Student] ([Id])
ALTER TABLE [dbo].[passedClasses] ADD CONSTRAINT [passedClasses_to_Student] FOREIGN KEY([StudentPassed_Id]) REFERENCES [dbo].[Student] ([Id])
ALTER TABLE [dbo].[Registration] WITH CHECK ADD CONSTRAINT [Registration_to_Student] FOREIGN KEY([aggregateRoot]) REFERENCES [dbo].[Student] ([Id])

0 comments on commit 0568dc1

Please sign in to comment.