Navigation Menu

Skip to content

Commit

Permalink
Merged stashed changes: Fixed: Increasing number of Tick messages whe…
Browse files Browse the repository at this point in the history
…n waiting in an idle projection
  • Loading branch information
ysw committed Sep 18, 2012
1 parent 2b7e5f1 commit b96b277
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 38 deletions.
Expand Up @@ -33,6 +33,7 @@
using EventStore.Core.Tests.Bus.QueuedHandler.Helpers;
using EventStore.Core.TransactionLog.Checkpoint;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services;
using EventStore.Projections.Core.Services.Processing;
using NUnit.Framework;

Expand Down Expand Up @@ -78,7 +79,7 @@ protected CheckpointStrategy CreateCheckpointStrategy()
var result = new CheckpointStrategy.Builder();
result.FromAll();
result.AllEvents();
return result.Build();
return result.Build(ProjectionMode.Persistent);
}

protected static Event CreateEvent()
Expand Down
Expand Up @@ -27,6 +27,7 @@
//

using System;
using EventStore.Projections.Core.Services;
using EventStore.Projections.Core.Services.Processing;
using NUnit.Framework;

Expand Down Expand Up @@ -55,7 +56,7 @@ protected virtual void When()
_ef = null;
try
{
_ef = _builder.Build().EventFilter;
_ef = _builder.Build(ProjectionMode.Persistent).EventFilter;
}
catch (Exception ex)
{
Expand Down
Expand Up @@ -30,6 +30,7 @@
using EventStore.Core.Tests.Bus.Helpers;
using EventStore.Core.Tests.Fakes;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services;
using EventStore.Projections.Core.Services.Processing;
using NUnit.Framework;

Expand Down Expand Up @@ -81,7 +82,7 @@ private CheckpointStrategy CreateCheckpointStrategy()
result.FromAll();
result.AllEvents();
}
return result.Build();
return result.Build(ProjectionMode.Persistent);
}
}
}
Expand Up @@ -29,6 +29,7 @@
using System;
using EventStore.Core.Tests.Bus.Helpers;
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Services;
using EventStore.Projections.Core.Services.Processing;
using NUnit.Framework;

Expand Down Expand Up @@ -79,7 +80,7 @@ private CheckpointStrategy CreateCheckpointStrategy()
var result = new CheckpointStrategy.Builder();
result.FromAll();
result.AllEvents();
return result.Build();
return result.Build(ProjectionMode.Persistent);
}
}
}
Expand Up @@ -376,7 +376,7 @@ private void Start(IPublisher coreOutput, ProjectionStateHandlerFactory handlerF
stateHandler = handlerFactory.Create(HandlerType, Query, Console.WriteLine);
var checkpointStrategyBuilder = new CheckpointStrategy.Builder();
stateHandler.ConfigureSourceProcessingStrategy(checkpointStrategyBuilder);
checkpointStrategyBuilder.Validate(); // avoid future exceptions in coreprojection
checkpointStrategyBuilder.Validate(this.Mode); // avoid future exceptions in coreprojection
// constructor can fail if wrong source defintion
//TODO: revise it
_coreProjection = new CoreProjection(_name, _id, coreOutput, stateHandler, config, _logger);
Expand Down
Expand Up @@ -47,9 +47,9 @@ public class CheckpointStrategy

public class Builder : QuerySourceProcessingStrategyBuilder
{
public CheckpointStrategy Build()
public CheckpointStrategy Build(ProjectionMode mode)
{
base.Validate();
base.Validate(mode);
return new CheckpointStrategy(
_allStreams, ToSet(_categories), ToSet(_streams), _allEvents, ToSet(_events), _byStream);
}
Expand Down
Expand Up @@ -144,7 +144,7 @@ private readonly
_projectionCheckpointStreamId = ProjectionsStreamPrefix + _name + ProjectionCheckpointStreamSuffix;
var builder = new CheckpointStrategy.Builder();
_projectionStateHandler.ConfigureSourceProcessingStrategy(builder);
_checkpointStrategy = builder.Build();
_checkpointStrategy = builder.Build(_projectionConfig.Mode);
_eventFilter = _checkpointStrategy.EventFilter;
_lastProcessedEventPosition = new PositionTracker(_checkpointStrategy.PositionTagger);
_partitionStateCache = new PartitionStateCache();
Expand Down
Expand Up @@ -82,7 +82,7 @@ protected HashSet<string> ToSet(IEnumerable<string> list)
return new HashSet<string>(list);
}

public void Validate()
public void Validate(ProjectionMode mode)
{
if (!_allStreams && _categories == null && _streams == null)
throw new InvalidOperationException("None of streams and categories are included");
Expand All @@ -94,6 +94,8 @@ public void Validate()
throw new InvalidOperationException("Both FromAll and specific categories/streams cannot be set");
if (_allEvents && _events != null)
throw new InvalidOperationException("Both AllEvents and specific event filters cannot be set");
if (_byStream && mode < ProjectionMode.Persistent)
throw new InvalidOperationException("Partitioned (foreachStream) projections require Persistent mode");
}
}
}
Expand Up @@ -78,21 +78,7 @@ public override void Resume()
}
_paused = false;
_pauseRequested = false;
RequestEvents();
}

private void RequestEvents()
{
if (_disposed) throw new InvalidOperationException("Disposed");
if (_eventsRequested)
throw new InvalidOperationException("Read operation is already in progress");
if (_pauseRequested || _paused)
throw new InvalidOperationException("Paused or pause requested");
_eventsRequested = true;
_publisher.Publish(
new ClientMessage.ReadEventsForward(
_distibutionPointCorrelationId, new SendToThisEnvelope(this), _streamName, _fromSequenceNumber,
_maxReadCount, _resolveLinkTos));
RequestEvents(delay: false);
}

public override void Pause()
Expand Down Expand Up @@ -121,20 +107,14 @@ public override void Handle(ClientMessage.ReadEventsForwardCompleted message)
{
case RangeReadResult.NoStream:
DeliverLastCommitPosition(message.LastCommitPosition.Value); // allow joining heading distribution
_publisher.Publish(
TimerMessage.Schedule.Create(
TimeSpan.FromMilliseconds(250), new PublishEnvelope(_publisher), CreateTickMessage()));
RequestEvents(delay: true);

break;
case RangeReadResult.Success:
if (message.Events.Length == 0)
{
// the end
DeliverLastCommitPosition(message.LastCommitPosition.Value);
// allow joining heading distribution
_publisher.Publish(
TimerMessage.Schedule.Create(
TimeSpan.FromMilliseconds(250), new PublishEnvelope(_publisher), CreateTickMessage()));
}
else
{
Expand All @@ -148,20 +128,17 @@ public override void Handle(ClientMessage.ReadEventsForwardCompleted message)
if (_pauseRequested)
_paused = true;
else
//TODO: we may publish this message somewhere 10 events before the end of the chunk
_publisher.Publish(CreateTickMessage());
if (message.Events.Length == 0)
RequestEvents(delay: true);
else
_publisher.Publish(CreateTickMessage());
break;
default:
throw new NotSupportedException(
string.Format("ReadEvents result code was not recognized. Code: {0}", message.Result));
}
}

private ProjectionMessage.CoreService.Tick CreateTickMessage()
{
return new ProjectionMessage.CoreService.Tick(() => { if (!_paused && !_disposed) RequestEvents(); });
}

public override void Handle(ClientMessage.ReadEventsFromTFCompleted message)
{
throw new NotImplementedException();
Expand All @@ -172,6 +149,32 @@ public override void Dispose()
_disposed = true;
}

private void RequestEvents(bool delay)
{
if (_disposed) throw new InvalidOperationException("Disposed");
if (_eventsRequested)
throw new InvalidOperationException("Read operation is already in progress");
if (_pauseRequested || _paused)
throw new InvalidOperationException("Paused or pause requested");
_eventsRequested = true;


var readEventsForward = new ClientMessage.ReadEventsForward(
_distibutionPointCorrelationId, new SendToThisEnvelope(this), _streamName, _fromSequenceNumber,
_maxReadCount, _resolveLinkTos);
if (delay)
_publisher.Publish(
TimerMessage.Schedule.Create(
TimeSpan.FromMilliseconds(250), new PublishEnvelope(_publisher), readEventsForward));
else
_publisher.Publish(readEventsForward);
}

private ProjectionMessage.CoreService.Tick CreateTickMessage()
{
return new ProjectionMessage.CoreService.Tick(() => { if (!_paused && !_disposed) RequestEvents(delay: false); });
}

private void DeliverLastCommitPosition(long lastCommitPosition)
{
_publisher.Publish(
Expand Down

0 comments on commit b96b277

Please sign in to comment.