Skip to content

Commit

Permalink
Lock event store per stream (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas Lycken committed Oct 25, 2017
1 parent 78cdcb0 commit 50aedad
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ public EventStoreFixture()
EventRegistry = new AssemblyEventRegistry(typeof(TEvent), type => type.Name, type => !type.Name.StartsWith("<>"));
EventSerializer = new DefaultEventSerializer<TStreamId, TEvent, TPersistedEvent>(EventRegistry);
EventFactory = new DefaultEventFactory<TStreamId, TEvent>();
WriteLock = new WriteLock();
WriteLock = new WriteLock<TStreamId>();
}

public IEventRegistry EventRegistry { get; protected set; }
public IEventSerializer<TEvent, TPersistedEvent> EventSerializer { get; protected set; }
public IEventFactory<TStreamId, TEvent> EventFactory { get; protected set; }
public IWriteLock WriteLock { get; protected set; }
public IWriteLock<TStreamId> WriteLock { get; protected set; }

public EntityFrameworkEventStore<TId, TStreamId, TEventStoreContext, TEvent, TEventMetadata, TPersistedEvent> BuildEventStore<TEventStoreContext>(TEventStoreContext dbContext)
where TEventStoreContext : DbContext, IEventDbContext<TPersistedEvent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ public class EntityFrameworkEventStore<TId, TStreamId, TContext, TEvent, TEventM
{
private readonly TContext context;
private readonly IEventFactory<TStreamId, TEvent> _eventFactory;
private readonly IWriteLock _writeLock;
private readonly IWriteLock<TStreamId> _writeLock;
private readonly IEventSerializer<TEvent, TPersistedEvent> _serializer;

public EntityFrameworkEventStore(TContext context, IEventFactory<TStreamId, TEvent> eventFactory, IWriteLock writeLock, IEventSerializer<TEvent, TPersistedEvent> serializer)
public EntityFrameworkEventStore(TContext context, IEventFactory<TStreamId, TEvent> eventFactory, IWriteLock<TStreamId> writeLock, IEventSerializer<TEvent, TPersistedEvent> serializer)
{
this.context = context;
_eventFactory = eventFactory;
Expand Down Expand Up @@ -49,7 +49,7 @@ public Task Append(TStreamId streamId, long versionBefore, object payload)

public async Task Append(TStreamId streamId, long versionBefore, IEnumerable<object> payloads)
{
using (await _writeLock.Aquire())
using (await _writeLock.Aquire(streamId))
{
var highestVersionNumber = await context.Events
.Where(e => e.StreamId.Equals(streamId))
Expand Down
4 changes: 2 additions & 2 deletions src/RdbmsEventStore/IWriteLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

namespace RdbmsEventStore
{
public interface IWriteLock
public interface IWriteLock<in TStreamId>
{
AwaitableDisposable<IDisposable> Aquire();
AwaitableDisposable<IDisposable> Aquire(TStreamId streamId);
}
}
7 changes: 4 additions & 3 deletions src/RdbmsEventStore/WriteLock.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Collections.Concurrent;
using Nito.AsyncEx;

namespace RdbmsEventStore
{
public class WriteLock : IWriteLock
public class WriteLock<TStreamId> : IWriteLock<TStreamId>
{
private readonly AsyncLock _mutex = new AsyncLock();
private readonly ConcurrentDictionary<TStreamId, AsyncLock> _mutexes = new ConcurrentDictionary<TStreamId, AsyncLock>();

public AwaitableDisposable<IDisposable> Aquire() => _mutex.LockAsync();
public AwaitableDisposable<IDisposable> Aquire(TStreamId streamId) => _mutexes.GetOrAdd(streamId, id => new AsyncLock()).LockAsync();
}
}

0 comments on commit 50aedad

Please sign in to comment.