## Requirement

Please do same exercise using two different framework.

Framework 1. pandas
framework 2. apache beam python https://beam.apache.org/documentation/sdks/python/


using two input files dataset1 and dataset2 

join dataset1 with dataset2 and get tier

generate below output file

legal_entity, counterparty, tier, max(rating by counterparty), sum(value where status=ARAP), sum(value where status=ACCR)

Also create new record to add total for each of legal entity, counterparty & tier.

Sample data:
legal_entity, counterparty, tier, max(rating by counterparty), sum(value where status=ARAP), sum(value where status=ACCR)
L1,Total, Total, calculated_value, calculated_value,calculated_value
L1, C1, Total,calculated_value, calculated_value,calculated_value
Total,C1,Total,calculated_value, calculated_value,calculated_value
Total,Total,1,calculated_value, calculated_value,calculated_value
L2,Total,Total,calculated_value, calculated_value,calculated_value
....
like all other values.

where calculated_value in sample data needs to be calculated using above method.

## Initialize

In [55]:
import os
import apache_beam as beam
from typing import NamedTuple

Helper Classes and Methods
1. Class *CustomSumAndMaxFn* that does:
    - Calculates the maximum rating by counter_party (after grouping on "legal_entity", "counter_party", "tier")
    - Calculates the sum of values where status is "ARAP" (after grouping on "legal_entity", "counter_party", "tier")
    - Calculates the sum of values where status is "ACCR" (after grouping on "legal_entity", "counter_party", "tier")
    - Calculates the total count of values (after grouping on "legal_entity", "counter_party", "tier")

In [56]:
class CustomSumAndMaxFn(beam.CombineFn):
    def create_accumulator(self):
        return {
            "legal_entity": None,
            "counter_party": None,
            "tier": int(0),
            "max(rating by counterparty)": int(0),
            "sum(value where status=ARAP)": float(0),
            "sum(value where status=ACCR)": float(0),
            "Total(count with same legal_entity, counter_party,tier)": int(0)
        }

    def add_input(self, accumulator_dict, input_value):
        legal_entity, counter_party, tier, rating, status, value = input_value

        accumulator_dict["legal_entity"] = legal_entity
        accumulator_dict["counter_party"] = counter_party
        accumulator_dict["tier"] = tier
        accumulator_dict["Total(count with same legal_entity, counter_party,tier)"] +=1

        if "sum(value where status=" + status + ")" in accumulator_dict.keys():
            accumulator_dict["sum(value where status=" + status + ")"] += value
        else:
            accumulator_dict["sum(value where status=" + status + ")"] = value

        accumulator_dict["max(rating by counterparty)"] = max(accumulator_dict["max(rating by counterparty)"], rating)
        return accumulator_dict

    def merge_accumulators(self, accumulator_dicts):
        merged_accumulator_dict = self.create_accumulator()
        for accumulator_dict in accumulator_dicts:
            merged_accumulator_dict["legal_entity"] = accumulator_dict["legal_entity"]
            merged_accumulator_dict["counter_party"] = accumulator_dict["counter_party"]
            merged_accumulator_dict["tier"] = accumulator_dict["tier"]
            merged_accumulator_dict["max(rating by counterparty)"] = max(
                merged_accumulator_dict["max(rating by counterparty)"],
                accumulator_dict["max(rating by counterparty)"])
            merged_accumulator_dict["sum(value where status=ARAP)"] += accumulator_dict["sum(value where status=ARAP)"]
            merged_accumulator_dict["sum(value where status=ACCR)"] += accumulator_dict["sum(value where status=ACCR)"]
            merged_accumulator_dict["Total(count with same legal_entity, counter_party,tier)"] += accumulator_dict["Total(count with same legal_entity, counter_party,tier)"]
        return merged_accumulator_dict

    def extract_output(self, accumulator_dict):
        return f'{accumulator_dict["legal_entity"]}, \
            {accumulator_dict["counter_party"]}, \
            {accumulator_dict["tier"]}, \
            {accumulator_dict["max(rating by counterparty)"]}, \
            {accumulator_dict["sum(value where status=ARAP)"]}, \
            {accumulator_dict["sum(value where status=ACCR)"]}, \
            {accumulator_dict["Total(count with same legal_entity, counter_party,tier)"]}'

## Main pipeline

This pipeline does:
1. Initialize PCollection of dataset1 and loads CSV 
2. Initialize PCollection of dataset2 and loads CSV
3. Joins the two dataset on counter_party (note 3rd column in dataset1 and first column in dataset2 is counter_party) using CoGroupByKey
4. Combine data for the two dataset using method *combine_data_for_datasets*
5. Invokes custom accumulator method that can then do 4 operation (max, sum of ACCR, sum of ARAP, Total)



In [57]:
with beam.Pipeline() as pipeline:
    dataset1 = (
        pipeline
        | "Read Dataset1"
        >> beam.io.ReadFromText("data/dataset1.csv", skip_header_lines=1)
        | "Parse Dataset1" >> beam.Map(lambda line: line.split(","))
    )

    dataset2 = (
        pipeline
        | "Read Dataset2"
        >> beam.io.ReadFromText("data/dataset2.csv", skip_header_lines=1)
        | "Parse Dataset2" >> beam.Map(lambda line: line.split(","))
    )

    dataset1_keyed = dataset1 | "Key by Counterparty dataset 1" >> beam.Map(
        lambda row: (row[2], row)
    )

    dataset2_keyed = dataset2 | "Key by Counterparty  dataset 2" >> beam.Map(
        lambda row: (row[0], row)
    )

    # Group and join dataset1 and dataset2 on 'counter_party'
    grouped_data = {
        "dataset1": dataset1_keyed,
        "dataset2": dataset2_keyed,
    } | "CoGroup Join" >> beam.CoGroupByKey()

    def combine_data_for_datasets(element):
        _, data = element
        dataset1_data = data["dataset1"]
        dataset2_data = data["dataset2"]

        for set1 in dataset1_data:
            _, legal_entity, counter_party, rating, status, value = set1
            for set2 in dataset2_data:
                counter_party, tier = set2
                yield (legal_entity, counter_party, int(tier), int(rating), status, float(value))

    joined_data = grouped_data | "Combine Data" >> beam.FlatMap(
        combine_data_for_datasets
    )

    # Group and calculate the sum of 'rating' and max of 'value' by 'legal_entity', 'counter_party', and 'tier'
    # with condition of status
    grouped_data = (
        joined_data
        | "Group by Key" >> beam.Map(lambda x: ((x[0], x[1], x[2]), x))
        | "Group and Combine" >> beam.CombinePerKey(CustomSumAndMaxFn())
        | "Only Value" >> beam.Map(lambda x: x[1])
    )

    display("legal_entity, counter_party, tier, max(rating by counterparty), sum(value where status=ARAP), sum(value where status=ACCR), Total(count with same legal_entity, counter_party,tier)")
    grouped_data | beam.LogElements()

'legal_entity, counter_party, tier, max(rating by counterparty), sum(value where status=ARAP), sum(value where status=ACCR), Total(count with same legal_entity, counter_party,tier)'

L1,             C1,             1,             3,             40.0,             0.0,             3
L2,             C2,             2,             3,             20.0,             40.0,             2
L3,             C3,             3,             4,             0.0,             145.0,             3
L2,             C3,             3,             2,             0.0,             52.0,             1
L1,             C3,             3,             6,             5.0,             0.0,             1
L1,             C4,             4,             6,             40.0,             100.0,             2
L2,             C5,             5,             6,             1000.0,             115.0,             3
L3,             C6,             6,             6,             145.0,             60.0,             3
