Skip to content

Latest commit

 

History

History
155 lines (132 loc) · 5 KB

12_reduceAndAggregate.md

File metadata and controls

155 lines (132 loc) · 5 KB
sidebar_position
12

Reduce/aggregate and distinct

Distinct

Distinct Permits to remove duplicates based on a given key.

contextStream
    .CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
        .Select(i => new
        {
            OutputId = i % 11,
            Label = $"{ctx}{i % 11}",
            Description = (i % 5 == 0) ? null : $"Description {i}"
        }))
    .Distinct("Distinct ", i => i.OutputId)
    .Do("print file name to console", i => Console.WriteLine(i));

Input stream:

OutputId Label Description
1 Label 1 Description 1
2 Description 2
3 Description 3
4 Label 4 Description 4
5 Label 5
6 Label 6 Description 6
0 Label 0 Description 0
1 Label 1
2 Label 2 Description 2
3 Label 3 Description 3
4 Description 4
5 Description 5
6 Label 6 Description 6

The regular Distinct provides this result:

OutputId Label Description
1 Label 1 Description 1
2 Description 2
3 Description 3
4 Label 4 Description 4
5 Label 5
6 Label 6 Description 6
0 Label 0 Description 0

Sometimes, from bad input files can fill a value for a column but not always. It is a usual request in such a context to take in consideration only not null value instead of the first found record as is. Here the output stream we would like to reach:

OutputId Label Description
1 Label 1 Description 1
2 Label 2 Description 2
3 Label 3 Description 3
4 Label 4 Description 4
5 Label 5 Description 5
6 Label 6 Description 6
0 Label 0 Description 0

Obtaining this result is made by setting true the second optional parameter of Distinct operator.

contextStream
    .CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
        .Select(i => new
        {
            OutputId = i % 11,
            Label = $"{ctx}{i % 11}",
            Description = (i % 5 == 0) ? null : $"Description {i}"
        }))
    .Distinct("Distinct ", i => i.OutputId, true)
    .Do("print file name to console", i => Console.WriteLine(i));

Group By

In the create several files recipe GroupBy was used to group payloads in a sub process per key. But the GroupBy operator can be used to group payload in a list per key.

contextStream
    .CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
        .Select(i => new
        {
            OutputId = i % 11,
            Label = $"{ctx}{i}",
            Description = (i % 5 == 0) ? null : $"Description {i}"
        }))
    .GroupBy("group by OutputId", i => i.OutputId)
    .Do("print file name to console", i => Console.WriteLine($"{i.Key}: {i.Aggregation.Count} items"));

Aggregate

GroupBy simply groups payloads in lists or in sub processes. Aggregate permits to make any free action per group of payload. It can make more things than GroupBy but it is lest straight forward to use.

contextStream
    .CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
        .Select(i => new
        {
            Id = i,
            OutputId = i % 11,
            Label = $"{ctx}{i}",
            Description = (i % 5 == 0) ? null : $"Description {i}"
        }))
    .Aggregate("aggregate by OutputId",
        i => i.OutputId,
        i => new { Key = i.OutputId, Ids = new List<int>() },
        (a, v) =>
        {
            a.Ids.Add(v.Id);
            return a;
        })
    .Do("print file name to console", i => Console.WriteLine($"{i.Key}: {i.Aggregation.Ids.Count} items"));

Pivot

Pivot makes several aggregation of values on a single output occurrence. Like the PIVOT of SQL, or the pivot table of excel.

contextStream
    .CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
        .Select(i => new
        {
            Id = i,
            OutputId = i % 3,
            Label = $"{ctx}{i}",
            Description = (i % 5 == 0) ? null : $"Description {i}"
        }))
    .Pivot("pivot values", i => i.OutputId, i => new
    {
        Count = AggregationOperators.Count(),
        Count0 = AggregationOperators.Count().For(i.OutputId == 0),
        Count1 = AggregationOperators.Count().For(i.OutputId == 1),
        Count2 = AggregationOperators.Count().For(i.OutputId == 2)
    })
    .Do("print file name to console", i => Console.WriteLine($"{i.Key}: Count={i.Aggregation.Count}, Count0={i.Aggregation.Count0}, Count1={i.Aggregation.Count1}, Count2={i.Aggregation.Count2}"));

ToList

ToList aggregates all the payload of every events in one payload that is the list of every payload.

contextStream
    .CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100))
    .ToList("aggregate everything")
    .Do("print everything in one go", i => Console.WriteLine(string.Join("-", i)));