# Apache Beam Exercise

In [1]:
import apache_beam as beam

# Transformation in Apache Beam

## Sample Pipeline

In [30]:
p1 = beam.Pipeline()
attendance_count = (
    p1
    | beam.io.ReadFromText('./testdata/beam_data/dept_data.txt')
    | beam.Map(lambda record: record.split(','))
    | beam.Filter(lambda record: record[3] == 'Accounts')
    | beam.Map(lambda record: (record[1], 1))
    | beam.CombinePerKey(sum)
    | beam.Map(lambda record: str(record))
    | beam.io.WriteToText('./beamoutput/out')
)
p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2344dff7548>

Checking output since I am running on Windows env cannot execute such unix command like bash/head/ls

In [80]:
!powershell -Command "Get-Content -TotalCount 5 './beamoutput/out-00000-of-00001'"

('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)


## Example P2

In [37]:
p2 = beam.Pipeline()
lines = (
    p2
    | beam.Create(['Line1',
                  'Line2',
                  'Line3',
                  'Line4'])
    | beam.io.WriteToText('./beamoutput/outCreate')
)

p2.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2344e08f308>

In [81]:
!powershell -Command "Get-Content -TotalCount 5 './beamoutput/outCreate-00000-of-00001'"

Line1
Line2
Line3
Line4


## Example P3

In [40]:
p3 = beam.Pipeline()
lines = (
    p3
    | beam.Create([('Line1',52),
                  ('Line2',75),
                  ('Line3',82),
                  ('Line4',65)])
    | beam.io.WriteToText('./beamoutput/outCreate1')
)

p3.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2344e0a7b48>

In [82]:
!powershell -Command "Get-Content -TotalCount 5 './beamoutput/outCreate1-00000-of-00001'"

('Line1', 52)
('Line2', 75)
('Line3', 82)
('Line4', 65)


## Map Function and User Defined Function

In Apache Beam, the Map transform is used to apply a user-defined function to each element in a PCollection (a collection of data) and produce a new PCollection with the results. It is a fundamental transform that allows you to perform element-wise transformations on your data.

In [49]:
p4 = beam.Pipeline()
lines = (
    p4
    | beam.Create([('Line1', 52),
                   ('Line2', 75),
                   ('Line3', 82),
                   ('Line4', 65)])
    | beam.Map(print)
)
p4.run()

('Line1', 52)
('Line2', 75)
('Line3', 82)
('Line4', 65)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2344fbb4208>

Now we can use it to output without doing bash function.

In [50]:
def multiply_by_two(x):
    return x * 2

In [51]:
p5 = beam.Pipeline()
numbers = (
    p5
    | beam.Create([1, 2, 3, 4, 5])
    | beam.Map(multiply_by_two)
    | beam.Map(print)
)
p5.run()

2
4
6
8
10


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2344f9e89c8>

in Here udf or user defined function received from Map and apply for each rows it has, then pipeline printing its output.

In [52]:
def splitRow(element):
    return element.split(',')

In [56]:
def filterRow(element):
    return element[3] == 'Accounts'

In [60]:
def pairKey(element):
    return (element[1],1)

In [62]:
p6 = beam.Pipeline()
splitCols = (
    p6
    | beam.io.ReadFromText('./testdata/beam_data/dept_data.txt')
    | beam.Map(splitRow)
    | beam.Filter(filterRow)
    | beam.Map(pairKey)
    | beam.CombinePerKey(sum)
    | beam.Map(print)
)
p6.run()

('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2344d5f6f08>

## Using with to initiate Pipeline

In [64]:
with beam.Pipeline() as p7:
    splitCols = (
        p7
        | beam.io.ReadFromText('./testdata/beam_data/dept_data.txt')
        | beam.Map(splitRow)
        | beam.Filter(filterRow)
        | beam.Map(pairKey)
        | beam.CombinePerKey(sum)
        | beam.Map(print)
    )

('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


Using with we don't have to run pipeline with method *run()*

## Brancing Pipeline

In [73]:
with beam.Pipeline() as p8:
    read_input = (
        p8
        | "Read Input" >> beam.io.ReadFromText('./testdata/beam_data/dept_data.txt')
        | "Split Row"  >> beam.Map(lambda record: record.split(','))
    )
    accounts_transform = (
        read_input
        | "Get Dept Accounts" >> beam.Filter(lambda record: record[3] == 'Accounts')
        | "Pair Acc Emp with 1" >> beam.Map(lambda record: ("ACC, " + record[1], 1))
        | "Group and Sum" >> beam.CombinePerKey(sum)
        | "Print acc result" >> beam.Map(print)
    )
    hr_transform = (
        read_input
        | "Get Dept HR" >> beam.Filter(lambda record: record[3] == 'HR')
        | "Pair HR Emp with 1" >> beam.Map(lambda record: ("HR, " + record[1], 1))
        | "Group and Sum2" >> beam.CombinePerKey(sum)
        | "Print HR result" >> beam.Map(print)
    )

('ACC, Marco', 31)
('ACC, Rebekah', 31)
('ACC, Itoe', 31)
('ACC, Edouard', 31)
('ACC, Kyle', 62)
('ACC, Kumiko', 31)
('ACC, Gaston', 31)
('ACC, Ayumi', 30)
('HR, Beryl', 62)
('HR, Olga', 31)
('HR, Leslie', 31)
('HR, Mindy', 31)
('HR, Vicky', 31)
('HR, Richard', 31)
('HR, Kirk', 31)
('HR, Kaori', 31)
('HR, Oscar', 31)


## ParDo

ParDo is a fundamental transform in Apache Beam that allows you to apply a user-defined function (a DoFn) to each element of a PCollection. The name "ParDo" stands for "parallel Do" and signifies that the function can be executed in parallel across multiple workers or processing units.

In [78]:
import re

class CountWordOccurrences(beam.DoFn):
    def process(self, element):
        # Extract words using regex
        words = re.findall(r'\b\w+\b', element)

        # Emit word-count pairs
        for word in words:
            yield word.lower(), 1

with beam.Pipeline() as p9:
    Readlines = (
        p9
        | beam.io.ReadFromText("./testdata/beam_data/data.txt")
    )
    word_counts = (
        Readlines
        | beam.ParDo(CountWordOccurrences())
        | beam.combiners.Count.PerKey()
        | beam.io.WriteToText('./beamoutput/wordCount')
    )

In [79]:
!powershell -Command "Get-Content -TotalCount 5 './beamoutput/wordCount-00000-of-00001'"

('king', 311)
('lear', 257)
('dramatis', 1)
('personae', 1)
('of', 483)


## ParDo #2

for classes inside the function should be named exactly process in order to override the method from the beam.DoFn class.

In [91]:
class splitRowClass(beam.DoFn):
    def process(self, element):
        return [element.split(',')]

class accountFilterClass(beam.DoFn):
    def process(self, element):
        if element[3] == 'Accounts':
            return [element]

class pairEmployeeClass(beam.DoFn):
    def process(self, element):
            return [(element[1]+","+element[3],1)]

class countingClass(beam.DoFn):
    def process(self, element):
            (key, values) = element
            return [(key, sum(values))]

with beam.Pipeline() as p10:
    readlines = (
        p10
        | beam.io.ReadFromText('./testdata/beam_data/dept_data.txt')
        | beam.ParDo(splitRowClass())
        | beam.ParDo(accountFilterClass())
        | beam.ParDo(pairEmployeeClass())
        | beam.GroupByKey()
        | beam.ParDo(countingClass())
        | beam.Map(print)
    )

('Marco,Accounts', 31)
('Rebekah,Accounts', 31)
('Itoe,Accounts', 31)
('Edouard,Accounts', 31)
('Kyle,Accounts', 62)
('Kumiko,Accounts', 31)
('Gaston,Accounts', 31)
('Ayumi,Accounts', 30)


## Combiner (Mini Reducer)

The CombineGlobally transform can be useful in scenarios where you need to compute a summary or aggregate value for the entire dataset, such as calculating the total sum, average, maximum, or any other aggregation that requires considering all elements.

In [99]:
class AverageCombiner(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0) #(sum, count)

    def add_input(self, accumulator, element):
        sum_, count = accumulator
        sum_of_element = sum_ + element
        count_of_element = count + 1
        return sum_of_element, count_of_element

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, accumulator):
        sum_, count = accumulator
        return sum_ / count if count != 0 else float('NaN')

with beam.Pipeline() as p11:
    average = (
        p11
        | beam.Create([15, 5, 7, 7, 9, 23, 13, 5])
        | beam.CombineGlobally(AverageCombiner())
        | beam.Map(print)
    )
    

10.5


create_accumulator initializes the accumulator as (0.0, 0).
** add_input is called for each element:

First element 15: Accumulator becomes (15.0, 1).
Second element 5: Accumulator becomes (20.0, 2).
Third element 7: Accumulator becomes (27.0, 3).
Fourth element 7: Accumulator becomes (34.0, 4).
Fifth element 9: Accumulator becomes (43.0, 5).
Sixth element 23: Accumulator becomes (66.0, 6).
Seventh element 13: Accumulator becomes (79.0, 7).
Eighth element 5: Accumulator becomes (84.0, 8).

merge_accumulators is not required in this example, as we are not merging accumulators from different workers.

extract_output is called to compute the average:
The sum in the accumulator is 84.0, and the count is 8.
The average is 84.0 / 8 = 10.5.
The resulting average, 10.5, is printed as the output.

in Apache Beam, the combiner functions are designed to work efficiently in distributed and parallel processing scenarios. The structure of the combiner functions allows for efficient parallel aggregation across multiple workers while handling large datasets.

The CombineFn interface in Apache Beam provides a flexible framework for defining the logic of combiner functions. It breaks down the aggregation process into several methods:

** create_accumulator: Initializes the accumulator, which can include any necessary state for the aggregation.

** add_input: Combines an element with the current accumulator.

** merge_accumulators: Combines multiple accumulators together.

** extract_output: Extracts the final result from the accumulator.

By following this structure, Apache Beam can distribute and parallelize the aggregation process effectively, even for large-scale data processing.

If you choose not to implement a combiner and instead use a normal function for aggregation in Apache Beam, it can have several implications can lead to increased network overhead, reduced parallelism, higher memory usage, longer execution times, and scalability challenges in Apache Beam. Implementing a combiner helps mitigate these issues by enabling efficient partial aggregations, reduced network traffic, and better resource utilization, resulting in improved performance and scalability.

# Composite Transform

In Apache Beam, a composite transform is a higher-level transform that combines multiple primitive transforms into a single, reusable transform.

In [112]:
class myTransform(beam.PTransform):
    def expand(self, input_col):
        a = (
            input_col
            | beam.CombinePerKey(sum)
            | beam.Filter(filter_on_count)
            | beam.Map(format_output)
            | beam.Map(print)
        )
        return a #must have return incase in main pipeline has additional processing

def filter_on_count(element):
    name, count = element
    if count > 30:
        return element
    
def format_output(element):
    name, count = element
    return ((name.encode('ascii'),str(count),'Regular employee'))

with beam.Pipeline() as p12:
    read_input = (
        p12
        | "Read Input" >> beam.io.ReadFromText('./testdata/beam_data/dept_data.txt')
        | "Split Row"  >> beam.Map(lambda record: record.split(','))
    )
    accounts_transform = (
        read_input
        | "Get Dept Accounts" >> beam.Filter(lambda record: record[3] == 'Accounts')
        | "Pair Acc Emp with 1" >> beam.Map(lambda record: ("ACC, " + record[1], 1))
        | "composite Transform acc" >> myTransform()
    )
    hr_transform = (
        read_input
        | "Get Dept HR" >> beam.Filter(lambda record: record[3] == 'HR')
        | "Pair HR Emp with 1" >> beam.Map(lambda record: ("HR, " + record[1], 1))
        | "composite Transform HR" >> myTransform()
    )

(b'ACC, Marco', '31', 'Regular employee')
(b'ACC, Rebekah', '31', 'Regular employee')
(b'ACC, Itoe', '31', 'Regular employee')
(b'ACC, Edouard', '31', 'Regular employee')
(b'ACC, Kyle', '62', 'Regular employee')
(b'ACC, Kumiko', '31', 'Regular employee')
(b'ACC, Gaston', '31', 'Regular employee')
(b'HR, Beryl', '62', 'Regular employee')
(b'HR, Olga', '31', 'Regular employee')
(b'HR, Leslie', '31', 'Regular employee')
(b'HR, Mindy', '31', 'Regular employee')
(b'HR, Vicky', '31', 'Regular employee')
(b'HR, Richard', '31', 'Regular employee')
(b'HR, Kirk', '31', 'Regular employee')
(b'HR, Kaori', '31', 'Regular employee')
(b'HR, Oscar', '31', 'Regular employee')


## CoGroupByKey

In [116]:
def retTuple(element):
    temp_tuple = element.split(',')
    return (temp_tuple[0], temp_tuple[1:]) #pair key, rest of the field
with beam.Pipeline() as p13:
    dept_rows = (
        p13
        | "Read table 1" >> beam.io.ReadFromText('./testdata/beam_data/cogroupBy/dept_data.txt')
        | "pair key, with rest emp table" >> beam.Map(retTuple)
    )
    loc_rows = (
        p13
        | "Read table 2" >> beam.io.ReadFromText('./testdata/beam_data/cogroupBy/location.txt')
        | "pair key, with rest loc table" >> beam.Map(retTuple)
    )
    results = ({'dept_data' : dept_rows, 'loc_data' : loc_rows}
        | beam.CoGroupByKey()
        | beam.io.WriteToText('./testdata/beam_data/cogroupBy/result')
    )

In [118]:
!powershell -Command "Get-Content -TotalCount 2 './testdata/beam_data/cogroupBy/result-00000-of-00001'"

('149633CM', {'dept_data': [['Marco', '10', 'Accounts', '1-01-2019'], ['Marco', '10', 'Accounts', '2-01-2019'], ['Marco', '10', 'Accounts', '3-01-2019'], ['Marco', '10', 'Accounts', '4-01-2019'], ['Marco', '10', 'Accounts', '5-01-2019'], ['Marco', '10', 'Accounts', '6-01-2019'], ['Marco', '10', 'Accounts', '7-01-2019'], ['Marco', '10', 'Accounts', '8-01-2019'], ['Marco', '10', 'Accounts', '9-01-2019'], ['Marco', '10', 'Accounts', '10-01-2019'], ['Marco', '10', 'Accounts', '11-01-2019'], ['Marco', '10', 'Accounts', '12-01-2019'], ['Marco', '10', 'Accounts', '13-01-2019'], ['Marco', '10', 'Accounts', '14-01-2019'], ['Marco', '10', 'Accounts', '15-01-2019'], ['Marco', '10', 'Accounts', '16-01-2019'], ['Marco', '10', 'Accounts', '17-01-2019'], ['Marco', '10', 'Accounts', '18-01-2019'], ['Marco', '10', 'Accounts', '19-01-2019'], ['Marco', '10', 'Accounts', '20-01-2019'], ['Marco', '10', 'Accounts', '21-01-2019'], ['Marco', '10', 'Accounts', '22-01-2019'], ['Marco', '10', 'Accounts', '23-01-