Skip to content

Commit

Permalink
Added exposed State-concept
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Apr 28, 2024
1 parent 2aa4b01 commit f60914c
Show file tree
Hide file tree
Showing 12 changed files with 342 additions and 58 deletions.
1 change: 1 addition & 0 deletions Cleipnir.Flows.Tests/Cleipnir.Flows.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

<ItemGroup>
<ProjectReference Include="..\Cleipnir.Flows\Cleipnir.Flows.csproj" />
<ProjectReference Include="..\SourceGeneration\Cleipnir.Flows.SourceGenerator\Cleipnir.Flows.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
</ItemGroup>

</Project>
16 changes: 8 additions & 8 deletions Cleipnir.Flows.Tests/Flows/FlowsWithResultTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@ public class FlowsWithResultTests
public async Task SimpleFlowCompletesSuccessfully()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<SimpleFlow>();
serviceCollection.AddTransient<SimpleFuncFlow>();

var flowStore = new InMemoryFunctionStore();
var flowsContainer = new FlowsContainer(
flowStore,
serviceCollection.BuildServiceProvider()
);

var flows = flowsContainer.CreateFlows<SimpleFlow, string, int>(nameof(SimpleFlow));
var flows = flowsContainer.CreateFlows<SimpleFuncFlow, string, int>(nameof(SimpleFuncFlow));
var result = await flows.Run("someInstanceId", "someParameter");
result.ShouldBe(1);

SimpleFlow.InstanceId.ShouldBe("someInstanceId");
SimpleFlow.ExecutedWithParameter.ShouldBe("someParameter");
SimpleFuncFlow.InstanceId.ShouldBe("someInstanceId");
SimpleFuncFlow.ExecutedWithParameter.ShouldBe("someParameter");

var controlPanel = await flows.ControlPanel(instanceId: "someInstanceId");
controlPanel.ShouldNotBeNull();
controlPanel.Status.ShouldBe(Status.Succeeded);
controlPanel.Result.ShouldBe(1);
}

private class SimpleFlow : Flow<string, int>
public class SimpleFuncFlow : Flow<string, int>
{
public static string? ExecutedWithParameter { get; set; }
public static string? InstanceId { get; set; }
Expand All @@ -53,15 +53,15 @@ public override async Task<int> Run(string param)
public async Task EventDrivenFlowCompletesSuccessfully()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<MessageDrivenFlow>();
serviceCollection.AddTransient<MessageDrivenFuncFlow>();

var flowStore = new InMemoryFunctionStore();
var flowsContainer = new FlowsContainer(
flowStore,
serviceCollection.BuildServiceProvider()
);

var flows = flowsContainer.CreateFlows<MessageDrivenFlow, string, int>(nameof(MessageDrivenFlow));
var flows = flowsContainer.CreateFlows<MessageDrivenFuncFlow, string, int>(nameof(MessageDrivenFuncFlow));

await flows.Schedule("someInstanceId", "someParameter");

Expand All @@ -81,7 +81,7 @@ public async Task EventDrivenFlowCompletesSuccessfully()
controlPanel.Result.ShouldBe(2);
}

private class MessageDrivenFlow : Flow<string, int>
public class MessageDrivenFuncFlow : Flow<string, int>
{
public override async Task<int> Run(string param)
{
Expand Down
91 changes: 91 additions & 0 deletions Cleipnir.Flows.Tests/Flows/FlowsWithStateTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Storage;
using Microsoft.Extensions.DependencyInjection;
using Shouldly;

namespace Cleipnir.Flows.Tests.Flows;

[TestClass]
public class FlowsWithStateTests
{
[TestMethod]
public async Task ActionFlowWithStateCanBeFetchedAfterExecution()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<ActionFlowWithState>();

var flowStore = new InMemoryFunctionStore();
var flowsContainer = new FlowsContainer(
flowStore,
serviceCollection.BuildServiceProvider()
);

var flows = new ActionFlowWithStates(flowsContainer);
await flows.Run("someInstanceId", "someParameter");

var state = await flows.GetState("someInstanceId");
state.ShouldNotBeNull();
state.Value.ShouldBe("someParameter");

var controlPanel = await flows.ControlPanel(instanceId: "someInstanceId");
controlPanel.ShouldNotBeNull();
controlPanel.Status.ShouldBe(Status.Succeeded);
}

public class ActionFlowWithState : Flow<string>, IHaveState<ActionFlowWithState.FlowState>
{
public required FlowState State { get; init; }

public override Task Run(string param)
{
State.Value = param;
return Task.CompletedTask;
}

public class FlowState : WorkflowState
{
public string Value { get; set; } = "";
}
}

[TestMethod]
public async Task FuncFlowWithStateCanBeFetchedAfterExecution()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<FuncFlowWithState>();

var flowStore = new InMemoryFunctionStore();
var flowsContainer = new FlowsContainer(
flowStore,
serviceCollection.BuildServiceProvider()
);

var flows = new FuncFlowWithStates(flowsContainer);
await flows.Run("someInstanceId", "someParameter");

var state = await flows.GetState("someInstanceId");
state.ShouldNotBeNull();
state.Value.ShouldBe("someParameter");

var controlPanel = await flows.ControlPanel(instanceId: "someInstanceId");
controlPanel.ShouldNotBeNull();
controlPanel.Result.ShouldBe("someParameter");
controlPanel.Status.ShouldBe(Status.Succeeded);
}

public class FuncFlowWithState : Flow<string, string>, IHaveState<FuncFlowWithState.FlowState>
{
public required FlowState State { get; init; }

public override Task<string> Run(string param)
{
State.Value = param;
return Task.FromResult(param);
}

public class FlowState : WorkflowState
{
public string Value { get; set; } = "";
}
}
}
16 changes: 8 additions & 8 deletions Cleipnir.Flows.Tests/Flows/UnitFlowsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,26 @@ public class UnitFlowsTests
public async Task SimpleFlowCompletesSuccessfully()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<SimpleFlow>();
serviceCollection.AddTransient<SimpleUnitFlow>();

var flowStore = new InMemoryFunctionStore();
var flowsContainer = new FlowsContainer(
flowStore,
serviceCollection.BuildServiceProvider()
);

var flows = flowsContainer.CreateFlows<SimpleFlow, string>(nameof(SimpleFlow));
var flows = flowsContainer.CreateFlows<SimpleUnitFlow, string>(nameof(SimpleUnitFlow));
await flows.Run("someInstanceId", "someParameter");

SimpleFlow.InstanceId.ShouldBe("someInstanceId");
SimpleFlow.ExecutedWithParameter.ShouldBe("someParameter");
SimpleUnitFlow.InstanceId.ShouldBe("someInstanceId");
SimpleUnitFlow.ExecutedWithParameter.ShouldBe("someParameter");

var controlPanel = await flows.ControlPanel(instanceId: "someInstanceId");
controlPanel.ShouldNotBeNull();
controlPanel.Status.ShouldBe(Status.Succeeded);
}

private class SimpleFlow : Flow<string>
public class SimpleUnitFlow : Flow<string>
{
public static string? ExecutedWithParameter { get; set; }
public static string? InstanceId { get; set; }
Expand All @@ -49,15 +49,15 @@ public override async Task Run(string param)
public async Task EventDrivenFlowCompletesSuccessfully()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<EventDrivenFlow>();
serviceCollection.AddTransient<EventDrivenUnitFlow>();

var flowStore = new InMemoryFunctionStore();
var flowsContainer = new FlowsContainer(
flowStore,
serviceCollection.BuildServiceProvider()
);

var flows = flowsContainer.CreateFlows<EventDrivenFlow, string>(nameof(EventDrivenFlow));
var flows = flowsContainer.CreateFlows<EventDrivenUnitFlow, string>(nameof(EventDrivenUnitFlow));

await flows.Schedule("someInstanceId", "someParameter");

Expand All @@ -76,7 +76,7 @@ public async Task EventDrivenFlowCompletesSuccessfully()
controlPanel.Status.ShouldBe(Status.Succeeded);
}

private class EventDrivenFlow : Flow<string>
public class EventDrivenUnitFlow : Flow<string>
{
public override async Task Run(string param)
{
Expand Down
7 changes: 7 additions & 0 deletions Cleipnir.Flows.sln
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "SourceGeneration", "SourceG
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.SourceGenerator", "SourceGeneration\Cleipnir.Flows.SourceGenerator\Cleipnir.Flows.SourceGenerator.csproj", "{D2D7CD1B-DADE-4A6F-A7A3-BFD5F4806A63}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.SourceGeneratorDebugger", "SourceGeneration\Cleipnir.Flows.SourceGeneratorDebugger\Cleipnir.Flows.SourceGeneratorDebugger.csproj", "{19BAF8B6-41E4-45F5-B2E2-97B60A9B6372}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -94,6 +96,10 @@ Global
{D2D7CD1B-DADE-4A6F-A7A3-BFD5F4806A63}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D2D7CD1B-DADE-4A6F-A7A3-BFD5F4806A63}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D2D7CD1B-DADE-4A6F-A7A3-BFD5F4806A63}.Release|Any CPU.Build.0 = Release|Any CPU
{19BAF8B6-41E4-45F5-B2E2-97B60A9B6372}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{19BAF8B6-41E4-45F5-B2E2-97B60A9B6372}.Debug|Any CPU.Build.0 = Debug|Any CPU
{19BAF8B6-41E4-45F5-B2E2-97B60A9B6372}.Release|Any CPU.ActiveCfg = Release|Any CPU
{19BAF8B6-41E4-45F5-B2E2-97B60A9B6372}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{BC470FDE-839E-4B2C-8C56-8B20DEEF2CCD} = {EAD350F1-8EF2-48AC-98EE-A5E739F15787}
Expand All @@ -106,5 +112,6 @@ Global
{A00DF355-94EB-4B10-8607-8FED4D4D5DFA} = {EAD350F1-8EF2-48AC-98EE-A5E739F15787}
{35ED2C34-C0EE-4FA0-9EF8-33A622BC0F46} = {7204001F-410C-41DB-9892-696E06D0703D}
{D2D7CD1B-DADE-4A6F-A7A3-BFD5F4806A63} = {5E8A4831-5FF4-4BEC-9334-847943DA9517}
{19BAF8B6-41E4-45F5-B2E2-97B60A9B6372} = {5E8A4831-5FF4-4BEC-9334-847943DA9517}
EndGlobalSection
EndGlobal
66 changes: 66 additions & 0 deletions Cleipnir.Flows/Flows.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Cleipnir.Flows.CrossCutting;
Expand All @@ -20,11 +21,14 @@ public class Flows<TFlow, TParam>

private readonly Next<TFlow, TParam, Unit> _next;
private readonly Action<TFlow, Workflow> _workflowSetter;
private readonly Action<TFlow, States> _stateSetter;

public Flows(string flowName, FlowsContainer flowsContainer)
{
_flowsContainer = flowsContainer;
_workflowSetter = CreateWorkflowSetter();
_stateSetter = CreateStateSetter();

_next = CreateCallChain(flowsContainer.ServiceProvider);

_registration = flowsContainer.FunctionRegistry.RegisterFunc<TParam, Unit>(
Expand All @@ -43,6 +47,7 @@ public Flows(string flowName, FlowsContainer flowsContainer)
var flow = scope.ServiceProvider.GetRequiredService<TFlow>();
_workflowSetter(flow, workflow);
_stateSetter(flow, workflow.States);
await flow.Run(param);
Expand All @@ -57,6 +62,9 @@ public Flows(string flowName, FlowsContainer flowsContainer)
return controlPanel;
}

protected Task<TState?> GetState<TState>(string functionInstanceId) where TState : WorkflowState, new()
=> _registration.GetState<TState>(functionInstanceId);

public MessageWriter MessageWriter(string instanceId)
=> _registration.MessageWriters.For(instanceId);

Expand All @@ -80,6 +88,32 @@ TimeSpan delay
private async Task<Result<Unit>> PrepareAndRunFlow(TParam param, Workflow workflow)
=> await _next(param, workflow);

private Action<TFlow, States> CreateStateSetter()
{
var iHaveStateType = typeof(TFlow)
.GetInterfaces()
.SingleOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHaveState<>));

if (iHaveStateType == null)
return (_, _) => { };

var stateType = iHaveStateType.GenericTypeArguments[0];

//fetch state
var methodInfo = typeof(States)
.GetMethods()
.Single(m => m.Name == nameof(States.CreateOrGet) && !m.GetParameters().Any());

var genericMethodInfo = methodInfo.MakeGenericMethod(stateType);
var statePropertyInfo = iHaveStateType.GetProperty("State");

return (flow, states) =>
{
var state = genericMethodInfo.Invoke(states, parameters: null);
statePropertyInfo!.SetValue(flow, state);
};
}

private Action<TFlow, Workflow> CreateWorkflowSetter()
{
ParameterExpression flowParam = Expression.Parameter(typeof(TFlow), "flow");
Expand Down Expand Up @@ -111,11 +145,13 @@ public class Flows<TFlow, TParam, TResult>
private readonly Next<TFlow, TParam, TResult> _next;

private readonly Action<TFlow, Workflow> _workflowSetter;
private readonly Action<TFlow, States> _stateSetter;

public Flows(string flowName, FlowsContainer flowsContainer)
{
_flowsContainer = flowsContainer;
_workflowSetter = CreateWorkflowSetter();
_stateSetter = CreateStateSetter();
_next = CreateCallChain(flowsContainer.ServiceProvider);

_registration = flowsContainer.FunctionRegistry.RegisterFunc<TParam, TResult>(
Expand All @@ -134,6 +170,7 @@ public Flows(string flowName, FlowsContainer flowsContainer)
var flow = scope.ServiceProvider.GetRequiredService<TFlow>();
_workflowSetter(flow, workflow);
_stateSetter(flow, workflow.States);
var result = await flow.Run(param);
return result;
Expand Down Expand Up @@ -163,6 +200,9 @@ DateTime delayUntil
TParam param,
TimeSpan delay
) => _registration.ScheduleIn(functionInstanceId, param, delay);

protected Task<TState?> GetState<TState>(string functionInstanceId) where TState : WorkflowState, new()
=> _registration.GetState<TState>(functionInstanceId);

private async Task<Result<TResult>> PrepareAndRunFlow(TParam param, Workflow workflow)
=> await _next(param, workflow);
Expand All @@ -186,4 +226,30 @@ private async Task<Result<TResult>> PrepareAndRunFlow(TParam param, Workflow wor
var setter = lambdaExpr.Compile();
return setter;
}

private Action<TFlow, States> CreateStateSetter()
{
var iHaveStateType = typeof(TFlow)
.GetInterfaces()
.SingleOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHaveState<>));

if (iHaveStateType == null)
return (_, _) => { };

var stateType = iHaveStateType.GenericTypeArguments[0];

//fetch state
var methodInfo = typeof(States)
.GetMethods()
.Single(m => m.Name == nameof(States.CreateOrGet) && !m.GetParameters().Any());

var genericMethodInfo = methodInfo.MakeGenericMethod(stateType);
var statePropertyInfo = iHaveStateType.GetProperty("State");

return (flow, states) =>
{
var state = genericMethodInfo.Invoke(states, parameters: null);
statePropertyInfo!.SetValue(flow, state);
};
}
}
8 changes: 8 additions & 0 deletions Cleipnir.Flows/IHaveState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Cleipnir.ResilientFunctions.Domain;

namespace Cleipnir.Flows;

public interface IHaveState<TState> where TState : WorkflowState, new()
{
public TState State { get; init; }
}

0 comments on commit f60914c

Please sign in to comment.