Skip to content

Commit

Permalink
Merge pull request #86 from nsulikowski/FullOuterJoin
Browse files Browse the repository at this point in the history
FullOuterJoinMacro
  • Loading branch information
James Terwilliger committed May 14, 2019
2 parents 2976234 + baca53e commit 3f60d2a
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 0 deletions.
Expand Up @@ -969,6 +969,43 @@ public static partial class Streamable
});
}

/// <summary>
/// Macro to perform a left-outer-join operation.
/// </summary>
/// <typeparam name="TKey">Type of (mapping) key in the stream</typeparam>
/// <typeparam name="TLeft">Type of left input payload in the stream</typeparam>
/// <typeparam name="TRight">Type of right input payload in the stream</typeparam>
/// <typeparam name="TJoinKey">Type of join key for the join</typeparam>
/// <typeparam name="TResult">Type of result payload in the stream</typeparam>
/// <param name="left">Left input stream</param>
/// <param name="right">Right input stream</param>
/// <param name="leftKeySelector">Selector for the left-side join key</param>
/// <param name="rightKeySelector">Selector for the right-side join key</param>
/// <param name="leftResultSelector">Selector for the result for non-joining tuples</param>
/// <param name="rightResultSelector">Selector for the result for non-joining tuples</param>
/// <param name="innerResultSelector">Selector for the result for joining tuples</param>
/// <returns>Result (output) stream</returns>
public static IStreamable<TKey, TResult> FullOuterJoin<TKey, TLeft, TRight, TJoinKey, TResult>(
this IStreamable<TKey, TLeft> left,
IStreamable<TKey, TRight> right,
Expression<Func<TLeft, TJoinKey>> leftKeySelector,
Expression<Func<TRight, TJoinKey>> rightKeySelector,
Expression<Func<TLeft, TResult>> leftResultSelector,
Expression<Func<TRight, TResult>> rightResultSelector,
Expression<Func<TLeft, TRight, TResult>> innerResultSelector)
{
Invariant.IsNotNull(left, nameof(left));
Invariant.IsNotNull(right, nameof(right));

return left.Multicast(right, (l_mc, r_mc) =>
{
var outerLeft = l_mc.WhereNotExists(r_mc, leftKeySelector, rightKeySelector).Select(leftResultSelector);
var outerRight = r_mc.WhereNotExists(l_mc, rightKeySelector, leftKeySelector).Select(rightResultSelector);
var innerJoin = l_mc.Join(r_mc, leftKeySelector, rightKeySelector, innerResultSelector);
return outerLeft.Union(innerJoin).Union(outerRight);
});
}

/// <summary>
///
/// </summary>
Expand Down
162 changes: 162 additions & 0 deletions Sources/Test/SimpleTesting/AdHocTests.cs
Expand Up @@ -1102,6 +1102,168 @@ public void LeftOuterJoinScenario()

}

[TestClass]
public class FullOuterJoinMacro : TestWithConfigSettingsAndMemoryLeakDetection
{
public FullOuterJoinMacro() : base(new ConfigModifier()
.ForceRowBasedExecution(false)
.DontFallBackToRowBasedExecution(true))
{ }

[DataContract]
public struct InputEvent0
{
[DataMember]
public DateTime Ts;
[DataMember]
public string VmId;
[DataMember]
public int Status;
[DataMember]
public string IncId;
}

[DataContract]
public struct InputEvent1
{
[DataMember]
public DateTime? Ts1;
[DataMember]
public string VmId2;
[DataMember]
public int? Status3;
[DataMember]
public string IncId4;
}

[DataContract]
public struct OutputEvent0
{
// Left
[DataMember]
public DateTime? Ts;
[DataMember]
public string VmId;
[DataMember]
public int? Status;
[DataMember]
public string IncId;
// Right
[DataMember]
public DateTime? Ts1;
[DataMember]
public string VmId2;
[DataMember]
public int? Status3;
[DataMember]
public string IncId4;

public override string ToString()
{
return new
{
Ts,
VmId,
Status,
IncId,
Ts1,
VmId2,
Status3,
IncId4,
}.ToString();
}
}
[TestMethod, TestCategory("Gated")]
public void FullOuterJoinScenario()
{
var savedForceRowBasedExecution = Config.ForceRowBasedExecution;
try
{
Config.ForceRowBasedExecution = true;

var input1Enum = new StreamEvent<InputEvent0>[]
{
StreamEvent.CreateInterval(10, 20, new InputEvent0()
{
Ts = DateTime.UtcNow,
VmId = "vm1",
Status = 0,
IncId = "000"
}),
};
var input2Enum = new StreamEvent<InputEvent1>[]
{
StreamEvent.CreateInterval(15, 25, new InputEvent1()
{
Ts1 = DateTime.UtcNow,
VmId2 = "vm1",
Status3 = 1,
IncId4 = "100"
}),
};

var input1 = input1Enum.ToObservable().ToStreamable();
var input2 = input2Enum.ToObservable().ToStreamable();

var result = input1
.FullOuterJoin(
right: input2,
leftKeySelector: s => s.VmId,
rightKeySelector: s => s.VmId2,
leftResultSelector: l => new OutputEvent0()
{
Ts = l.Ts,
VmId = l.VmId,
Status = l.Status,
IncId = l.IncId,
Ts1 = null,
VmId2 = null,
Status3 = null,
IncId4 = null,
},
rightResultSelector: r => new OutputEvent0()
{
Ts = null,
VmId = null,
Status = null,
IncId = null,
Ts1 = r.Ts1,
VmId2 = r.VmId2,
Status3 = r.Status3,
IncId4 = r.IncId4,
},
innerResultSelector: (l, r) => new OutputEvent0()
{
Ts = l.Ts,
VmId = l.VmId,
Status = l.Status,
IncId = l.IncId,
Ts1 = r.Ts1,
VmId2 = r.VmId2,
Status3 = r.Status3,
IncId4 = r.IncId4,
});

var resultObs = result.ToStreamEventObservable(ReshapingPolicy.CoalesceEndEdges);

foreach (var x in resultObs.ToEnumerable())
{
System.Diagnostics.Debug.Print("{0}, {1}, {2}, {3}", x.Kind, x.StartTime, x.EndTime, x.Payload);
}
/*
Interval, 10, 15, { Ts = 5/14/2019 1:39:20 AM, VmId = vm1, Status = 0, IncId = 000, Ts1 = , VmId2 = , Status3 = , IncId4 = } LEFT
Interval, 15, 20, { Ts = 5/14/2019 1:39:20 AM, VmId = vm1, Status = 0, IncId = 000, Ts1 = 5/14/2019 1:39:20 AM, VmId2 = vm1, Status3 = 1, IncId4 = 100 } INNER
Interval, 20, 25, { Ts = , VmId = , Status = , IncId = , Ts1 = 5/14/2019 1:39:20 AM, VmId2 = vm1, Status3 = 1, IncId4 = 100 } RIGHT
Punctuation, 3155378975999999999, -9223372036854775808, { Ts = , VmId = , Status = , IncId = , Ts1 = , VmId2 = , Status3 = , IncId4 = }
*/
}
finally
{
Config.ForceRowBasedExecution = savedForceRowBasedExecution;
}
}
}

[TestClass]
public class LeftOuterJoinMacroButWithoutMemoryLeakDetection : TestWithConfigSettingsWithoutMemoryLeakDetection
{
Expand Down

0 comments on commit 3f60d2a

Please sign in to comment.