In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')
run('pip install --quiet apache-beam[interactive]')

>> pip install --quiet apache-beam

>> pip install --quiet apache-beam[interactive]



In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import functools
import shutil
import glob
import os
import logging
import argparse

In [None]:
file_list = glob.glob("rebort_beam*")
for path in file_list:
  os.remove(path)

In [None]:
class InputRecord:
  def __init__(self, legal_entity:str, counter_party: str, rating:int, status: str, value: int, tier: int):
    self.legal_entity = legal_entity
    self.counter_party = counter_party
    self.rating = rating
    self.status = status
    self.value = value
    self.tier = tier

    def get_key(self):
      return "{}:{}:{}".format(self.legal_entity, self.counter_party, self.tier)

class OutputRecord:
  def __init__(self, legal_entity:str, counter_party: str, rating:int, tier: int,
               max_rating:int, sum_accr: int, sum_arap: int):
    self.legal_entity = legal_entity
    self.counter_party = counter_party
    self.max_rating = max_rating
    self.tier = tier
    self.sum_accr = sum_accr
    self.sum_arap = sum_arap
    self.unique_cnt = 0

    def get_key(self):
      return "{}:{}:{}".format(self.legal_entity, self.counter_party, self.tier)

    def update_record_with_status(self, rating, status, value):
      self.max_rating = max(self.max_rating, rating)
      self.unique_cnt +=1
      if (status == 'ARAP'):
        self.sum_arap +=value
      elif (status == 'ACCR'):
        self.sum_accr += value

    def update_record(self, rating, sum_accr, sum_arap):
      self.max_rating = max(self.max_rating, rating)
      self.unique_cnt +=1
      self.sum_arap += sum_arap
      self.sum_accr += sum_accr




In [None]:
class GetReportMetrics(beam.CombineFn):
  def create_accumulator(self):
    return {}

  def merge_accumulators(self, accumulators):
    merged_accumulators = self.create_accumulator()
    for accumulator in accumulators:
      for key, acc_record in accumulator.items():
        if (key in merged_accumulators):
          report_record = merged_accumulators.get(key)
          report_record.update_record(acc_record.max_rating, acc_record.sum_accr,
                                      acc_record.sum_arap)
        else:
          merged_accumulators[key] = acc_record
    return merged_accumulators

  def add_input(self, accumulator, input):
    key = input.get_key()
    if (key in accumulator):
      reoirt_record = accumulator.get(key)
      report_record.update_record_with_status(input.rating, input.status,
                                              input.value)
    else:
      report_record = OutputRecord(input.legal_entity, input.counter_party,
                                   input.tier, input.rating, 0.0)
      accumulator[key] = report_record

  def extract_output(self, acc):
    return f'{acc.legal_entity}, {acc.counter_party}, {acc.tier}\
    {acc.max_rating},{acc.sum_arap},{acc.sum_accr}, {acc.unique_cnt}'

In [None]:
def create_input_record(r, tier):
  return InputRecord(r[1], r[2], int(r[3]), r[4], int(r[5]), int(tier))

def generate_input_records(record_tuple):
  _, record = record_tuple
  tier = record['ds2'][0][1]
  ds1_records = record["ds1"]
  return list(map(functools.partial(create_input_record, tier = tier), ds1_records))

def split_data_to_key_value(row, delimiter, index):
  data = row.split(delimiter)
  return (data[index], data)


In [None]:
logging.getLogger().setLevel(logging.INFO)
pipeline = beam.Pipeline()

def read_data_from_file(pipeline, input_file, delimiter, index, label):
  ds = (
     pipeline
     | f"Read {label}" >> beam.io.ReadFromText(input_file, skip_header_lines=1)
     | f"Split csv {label}" >> beam.Map(split_data_to_key_value, delimiter, index)
     | f"log data {label}" >> beam.LogElements()
  )
  return ds

headers = "legal_entity, counter_party, tier, max_rating, sum(values where status=ARAP), sum(values where status=ACCR), Total"

#ds1 = read_data_from_file(p, "input/dataset1.csv", ",", 2, "ds1")
#ds2 = read_data_from_file(p, "input/dataset2.csv", ",", 0, "ds2")

with pipeline as p:
  ds1 = read_data_from_file(p, "input/dataset1.csv", ",", 2, "ds1")
  ds2 = read_data_from_file(p, "input/dataset2.csv", ",", 0, "ds2")
  report_data = (
      {"ds1": ds1, "ds2": ds2}
      | "join two dataset by counter_party as separate dict" >> beam.CoGroupByKey()
      | "print joined dataset" >> beam.LogElements()
      | "generate input record" >> beam.FlatMap(generate_input_records)
      | "create key / value of records for processing" >> beam.Map(lambda record: (record.get_key(), record))
      | "Accumulate the data" >> beam.CombinePerKey(GetReportMetrics())
      | "Ignore the key" >> beam.Map(lambda key_value: key_value[1])
      | "print final data" >> beam.LogElements()
      | beam.io.Write(beam.io.WriteToText(
          file_path_prefix="report_beam", header=headers
      ))
  )


p.run()