In [1]:
# import required modules
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

import numpy as np

In [2]:
# use InteractiveRunner to iteratively develop pipelines in a notebook environment
p = beam.Pipeline(InteractiveRunner()) # create pipeline object

In [3]:
# read input invoice file
invoice_df = p | beam.dataframe.io.read_csv('dataset1.csv')
invoice_df.dtypes



invoice_id        int64
legal_entity     object
counter_party    object
rating            int64
status           object
value             int64
dtype: object

In [4]:
# read input counterparty file
counter_party_df = p | beam.dataframe.io.read_csv('dataset2.csv')
counter_party_df.dtypes

counter_party    object
tier              int64
dtype: object

In [5]:
# join invoice and counterparty datasets
joined_df = invoice_df.join(counter_party_df.set_index('counter_party'),on='counter_party')
# ib.collect(joined_df).sort_index() # materialise beam dataframe as pandas dataframe

In [6]:
 # calculate max rating per category
max_rating_counterparty = joined_df.rating.groupby(joined_df.counter_party).max()
max_rating_counterparty = max_rating_counterparty.rename('max_rating_counter_party') # rename column
# ib.collect(max_rating_counterparty).sort_index()

In [7]:
# add column max_rating_counter_party to joined_df
joined_df = joined_df.join(max_rating_counterparty,on='counter_party')
joined_df['total'] = 'Total' # create column to calculate grand totals
# ib.collect(joined_df).sort_index()

In [8]:
# produce calculated values for all attributes
all_max_rating = joined_df.groupby(['legal_entity','counter_party','tier'])['max_rating_counter_party'].max()
all_sum_arap = joined_df.loc[joined_df.status=='ARAP'].groupby(['legal_entity','counter_party','tier'])['value'].sum().rename('total_sum_arap')
all_sum_accr = joined_df.loc[joined_df.status=='ACCR'].groupby(['legal_entity','counter_party','tier'])['value'].sum().rename('total_sum_accr')
# ib.collect(all_sum_arap).sort_index()

In [9]:
# dataframe with distinct combinations of all attributes
all_agg_df = joined_df[['legal_entity','counter_party','tier']].drop_duplicates(keep='any')
# join calculated values to dataframe of all attributes
all_agg_df = all_agg_df.join(all_max_rating,on=['legal_entity','counter_party','tier'])
all_agg_df = all_agg_df.join(all_sum_arap,on=['legal_entity','counter_party','tier'])
all_agg_df = all_agg_df.join(all_sum_accr,on=['legal_entity','counter_party','tier'])
all_agg_df = all_agg_df.replace(np.nan, 0) # impute NaNs with 0
# ib.collect(all_agg_df).sort_index()

In [10]:
# function to produce calculated values for each specific attribute
def agg_grp( grp_name ):
    # produce calculated values for specific attribute
    grp_max_rating = joined_df.groupby(grp_name)['max_rating_counter_party'].max()
    grp_sum_arap = joined_df.loc[joined_df.status=='ARAP'].groupby(grp_name)['value'].sum().rename('total_sum_arap')
    grp_sum_accr = joined_df.loc[joined_df.status=='ACCR'].groupby(grp_name)['value'].sum().rename('total_sum_accr')
    # dataframe with distinct combinations specific attribute
    grp_agg_df = joined_df[[grp_name]].drop_duplicates(keep='any')
    # join calculated values to dataframe of specific attribute
    grp_agg_df = grp_agg_df.join(grp_max_rating,on=grp_name)
    grp_agg_df = grp_agg_df.join(grp_sum_arap,on=grp_name)
    grp_agg_df = grp_agg_df.join(grp_sum_accr,on=grp_name)
    grp_agg_df = grp_agg_df.replace(np.nan, 0) # impute NaNs with 0
    return(grp_agg_df)

In [11]:
# generate calculated values for each of legal entity, counterparty, tier and grand total
legal_entity_agg_df = agg_grp('legal_entity')
counter_party_agg_df = agg_grp('counter_party')
tier_agg_df = agg_grp('tier')
total_agg_df = agg_grp('total')
# ib.collect(legal_entity_agg_df).sort_index()

In [12]:
# combine all dataframes for desired output
output_df = all_agg_df.append(legal_entity_agg_df).append(counter_party_agg_df).append(tier_agg_df).append(total_agg_df)
output_df = output_df.replace(np.nan, 'Total') # impute NaNs with text 'Total'
output_df = output_df.drop(columns='total') # drop column named total
# ib.collect(output_df).sort_index()

In [13]:
# generate required output file
output_df.to_csv('output_beam.csv',index=False) 

<PCollection[[13]: WriteToPandas(df) - output_beam.csv/WriteToFiles/ParDo(_MoveTempFilesIntoFinalDestinationFn).None] at 0x7f9276b9cbb0>

In [14]:
p.run().wait_until_finish() # run pipeline and wait for final state