-
Notifications
You must be signed in to change notification settings - Fork 94
/
Fuse.cs
195 lines (174 loc) · 9.04 KB
/
Fuse.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
namespace Microsoft.Psi.Components
{
using System;
using System.Collections.Generic;
using System.Linq;
/// <summary>
/// Component that fuses multiple streams based on a specified interpolator.
/// </summary>
/// <typeparam name="TPrimary">The type the messages on the primary stream.</typeparam>
/// <typeparam name="TSecondary">The type messages on the secondary stream.</typeparam>
/// <typeparam name="TInterpolation">The type of the interpolation result on the secondary stream.</typeparam>
/// <typeparam name="TOut">The type of output message.</typeparam>
public class Fuse<TPrimary, TSecondary, TInterpolation, TOut> : IProducer<TOut>
{
private readonly Pipeline pipeline;
private readonly string name;
private readonly Queue<Message<TPrimary>> primaryQueue = new (); // to be paired
private readonly Interpolator<TSecondary, TInterpolation> interpolator;
private readonly Func<TPrimary, TInterpolation[], TOut> outputCreator;
private readonly Func<TPrimary, IEnumerable<int>> secondarySelector;
private (Queue<Message<TSecondary>> Queue, DateTime? ClosedOriginatingTime)[] secondaryQueues;
private Receiver<TSecondary>[] inSecondaries;
private bool[] receivedSecondary;
private IEnumerable<int> defaultSecondarySet;
// temp buffers
private TInterpolation[] lastValues;
private InterpolationResult<TInterpolation>[] lastResults;
/// <summary>
/// Initializes a new instance of the <see cref="Fuse{TPrimary, TSecondary, TInterpolation, TOut}"/> class.
/// </summary>
/// <param name="pipeline">The pipeline to add the component to.</param>
/// <param name="interpolator">Interpolator to use when joining the streams.</param>
/// <param name="outputCreator">Mapping function from messages to output.</param>
/// <param name="secondaryCount">Number of secondary streams.</param>
/// <param name="secondarySelector">Selector function mapping primary messages to a set of secondary stream indices.</param>
/// <param name="name">An optional name for the component.</param>
public Fuse(
Pipeline pipeline,
Interpolator<TSecondary, TInterpolation> interpolator,
Func<TPrimary, TInterpolation[], TOut> outputCreator,
int secondaryCount = 1,
Func<TPrimary, IEnumerable<int>> secondarySelector = null,
string name = null)
: base()
{
this.pipeline = pipeline;
this.name = name ?? $"Fuse({interpolator})";
this.Out = pipeline.CreateEmitter<TOut>(this, nameof(this.Out));
this.InPrimary = pipeline.CreateReceiver<TPrimary>(this, this.ReceivePrimary, nameof(this.InPrimary));
this.interpolator = interpolator;
this.outputCreator = outputCreator;
this.secondarySelector = secondarySelector;
this.inSecondaries = new Receiver<TSecondary>[secondaryCount];
this.receivedSecondary = new bool[secondaryCount];
this.secondaryQueues = new ValueTuple<Queue<Message<TSecondary>>, DateTime?>[secondaryCount];
this.lastValues = new TInterpolation[secondaryCount];
this.lastResults = new InterpolationResult<TInterpolation>[secondaryCount];
this.defaultSecondarySet = Enumerable.Range(0, secondaryCount);
for (int i = 0; i < secondaryCount; i++)
{
this.secondaryQueues[i] = (new Queue<Message<TSecondary>>(), null);
var id = i; // needed to make the closure below byval
var receiver = pipeline.CreateReceiver<TSecondary>(this, (d, e) => this.ReceiveSecondary(id, d, e), "InSecondary" + i);
receiver.Unsubscribed += closedOriginatingTime => this.SecondaryClosed(id, closedOriginatingTime);
this.inSecondaries[i] = receiver;
this.receivedSecondary[i] = false;
}
}
/// <inheritdoc />
public Emitter<TOut> Out { get; }
/// <summary>
/// Gets primary input receiver.
/// </summary>
public Receiver<TPrimary> InPrimary { get; }
/// <summary>
/// Gets collection of secondary receivers.
/// </summary>
public IList<Receiver<TSecondary>> InSecondaries => this.inSecondaries;
/// <inheritdoc/>
public override string ToString() => this.name;
/// <summary>
/// Add input receiver.
/// </summary>
/// <returns>Receiver.</returns>
public Receiver<TSecondary> AddInput()
{
// use the sync context to protect the queues from concurrent access
var syncContext = this.Out.SyncContext;
syncContext.Lock();
try
{
var lastIndex = this.inSecondaries.Length;
var count = lastIndex + 1;
Array.Resize(ref this.inSecondaries, count);
var newInput = this.inSecondaries[lastIndex] = this.pipeline.CreateReceiver<TSecondary>(this, (d, e) => this.ReceiveSecondary(lastIndex, d, e), "InSecondary" + lastIndex);
newInput.Unsubscribed += closedOriginatingTime => this.SecondaryClosed(lastIndex, closedOriginatingTime);
Array.Resize(ref this.receivedSecondary, count);
this.receivedSecondary[count - 1] = false;
Array.Resize(ref this.secondaryQueues, count);
this.secondaryQueues[lastIndex] = (new Queue<Message<TSecondary>>(), null);
Array.Resize(ref this.lastResults, count);
Array.Resize(ref this.lastValues, count);
this.defaultSecondarySet = Enumerable.Range(0, count);
return newInput;
}
finally
{
syncContext.Release();
}
}
private void ReceivePrimary(TPrimary message, Envelope e)
{
var clone = message.DeepClone(this.InPrimary.Recycler);
this.primaryQueue.Enqueue(Message.Create(clone, e));
this.Publish();
}
private void ReceiveSecondary(int id, TSecondary message, Envelope e)
{
var clone = message.DeepClone(this.InSecondaries[id].Recycler);
this.secondaryQueues[id].Queue.Enqueue(Message.Create(clone, e));
this.Publish();
}
private void SecondaryClosed(int index, DateTime closedOriginatingTime)
{
this.secondaryQueues[index].ClosedOriginatingTime = closedOriginatingTime;
this.Publish();
}
private void Publish()
{
while (this.primaryQueue.Count > 0)
{
var primary = this.primaryQueue.Peek();
bool ready = true;
var secondarySet = (this.secondarySelector != null) ? this.secondarySelector(primary.Data) : this.defaultSecondarySet;
foreach (var secondary in secondarySet)
{
var secondaryQueue = this.secondaryQueues[secondary];
var interpolationResult = this.interpolator.Interpolate(primary.OriginatingTime, secondaryQueue.Queue, secondaryQueue.ClosedOriginatingTime);
if (interpolationResult.Type == InterpolationResultType.InsufficientData)
{
// we need to wait longer
return;
}
this.lastResults[secondary] = interpolationResult;
this.lastValues[secondary] = interpolationResult.Value;
ready = ready && interpolationResult.Type == InterpolationResultType.Created;
}
// if all secondaries have an interpolated value, publish the resulting set
if (ready)
{
// publish
var result = this.outputCreator(primary.Data, this.lastValues);
this.Out.Post(result, primary.OriginatingTime);
Array.Clear(this.lastValues, 0, this.lastValues.Length);
}
// if we got here, all secondaries either successfully interpolated a value, or we have confirmation that they will never be able to interpolate
foreach (var secondary in secondarySet)
{
var secondaryQueue = this.secondaryQueues[secondary];
// clear the secondary queue as needed
while (secondaryQueue.Queue.Count != 0 && secondaryQueue.Queue.Peek().OriginatingTime < this.lastResults[secondary].ObsoleteTime)
{
this.InSecondaries[secondary].Recycle(secondaryQueue.Queue.Dequeue());
}
}
Array.Clear(this.lastResults, 0, this.lastResults.Length);
this.InPrimary.Recycle(primary);
this.primaryQueue.Dequeue();
}
}
}
}