# Adapter Layer
* Fix business days, possible case scenarios of meta_data.csv file
* Fix return types of objects and filter on prefix based on the structure of boto3 Prefix argument.
# Application Layer (ETL)


In [4]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [20]:
def read_csv_to_df(bucket, key, decode='utf-8', delimeter=','):
    csv_obj_init = bucket.Object(key=key).get().get('Body').read().decode(decode)
    dat = StringIO(csv_obj_init)
    df = pd.read_csv(dat, delimiter=delimeter)
    return df
meta_key = 'meta_data.csv'
bucket_name_target = 'destination-xetra'
s3 = boto3.resource('s3')
bucket_target = s3.Bucket(bucket_name_target)
df_meta = read_csv_to_df(bucket_target, meta_key)
df_meta

Unnamed: 0,source_date,datetime_of_processing
0,2022-03-25,2022-03-25 12:23:23
1,2022-03-23,2022-03-23 12:23:23


In [30]:
arg_date = '2022-03-25'
today_str = '2022-03-27'
min_date = datetime.strptime(arg_date, '%Y-%m-%d').date() - timedelta(days=1)
today = datetime.strptime(today_str, '%Y-%m-%d').date()
date_list = [(min_date + timedelta(days=x)) for x in range(0, (today - min_date).days + 1)]
date_list[1:]

[datetime.date(2022, 3, 25),
 datetime.date(2022, 3, 26),
 datetime.date(2022, 3, 27)]

In [31]:
src_date = set(pd.to_datetime(df_meta['source_date']).dt.date)
min_date = min(set(date_list) - src_date) - timedelta(days=1) #setdiff
min_date

datetime.date(2022, 3, 23)

In [32]:
return_dates = [date.strftime('%Y-%m-%d') for date in date_list if date >= min_date]
return_dates

['2022-03-24', '2022-03-25', '2022-03-26', '2022-03-27']

In [24]:
return_min_date = arg_date

In [2]:
# Adapter Layer - connecting with source and destination
def read_csv_to_df(bucket, key, decode='utf-8', delimeter=','):
    csv_obj_init = bucket.Object(key=key).get().get('Body').read().decode(decode)
    dat = StringIO(csv_obj_init)
    df = pd.read_csv(dat, delimiter=delimeter)
    return df

def write_df_to_s3(bucket, df, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

def list_files_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files


In [3]:
# Application ETL
def extract(bucket, date_list):
    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]
    df = pd.concat([read_csv_to_df(bucket, o) for o in files], ignore_index=True)
    return df
def transform_report1(df,columns, arg_date):
    df['Date'] = pd.to_datetime(df['Date'])
    df = df.loc[:,columns]
    df = df.dropna()
    # Get opening price per ISIN and day
    df['opening_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')
    # Get closing price per ISIN and day
    df['closing_price'] = df.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')
    # Perform aggregation per each (ISIN,Date)
    df = df.groupby(['ISIN', 'Date'], as_index=False).\
        agg({
        'opening_price': 'min',
        'closing_price': 'min',
        'MinPrice': 'min',
        'MaxPrice': 'max',
        'TradedVolume': 'sum'
    }).rename(
        columns = { 'opening_price': 'opening_price_eur',
                    'closing_price': 'closing_price_eur',
                    'MinPrice': 'minimum_price_eur',
                    'MaxPrice': 'max_price_eur',
                    'TradedVolume': 'daily_traded_volume'
                    }
    )
    # Compute percent change prev closing
    df['prev_closing_price'] = df.sort_values('Date').groupby(['ISIN'])['closing_price_eur'].shift(1) #lag(1)
    df['prev_closing_perc'] = (df['closing_price_eur'] - df['prev_closing_price'])/df['closing_price_eur']*100
    df = df.drop(columns=['prev_closing_price'])
    df = df.round(decimals=2)
    df = df[df.Date >= arg_date] #remove nan rows of unknown lag
    return df

def load(bucket,
         df,
         target_key='xetra_daily_report',
         target_format='.parquet'):
    to_key = target_key + datetime.today().strftime('%Y%m%d_%H%M%S') + target_format
    write_df_to_s3(bucket, df, to_key)
    return True

# ETL
def etl_report1(source_bucket, target_bucket, date_list, columns, arg_date, target_key, target_format):
    df = extract(source_bucket, date_list)
    df = transform_report1(df, columns, arg_date)
    load(target_bucket, df, target_key, target_format)
    return True

In [63]:
# Application not core
def return_date_list(arg_date, src_format):
    min_date = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    today = datetime.today().date()
    date_list = [(min_date + timedelta(days=x)).strftime(src_format) \
                        for x in range(0, (today - min_date).days + 1)]
    return date_list

In [64]:
# main function entry point
def main():
    #parameters/config
    source_bucket = 'deutsche-boerse-xetra-pds'
    target_bucket = 'destination-xetra'
    src_format =  "%Y-%m-%d"
    columns = ['ISIN', 'Mnemonic', 'SecurityDesc',
            'SecurityType', 'Currency',
            'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
            'EndPrice', 'TradedVolume']
    target_key = 'xetra_daily_report_'
    target_format = '.parquet'
    arg_date = (datetime.today() - timedelta(days=3)).date().strftime(src_format)

    # init
    s3 = boto3.resource('s3')
    source = s3.Bucket(source_bucket)
    target = s3.Bucket(target_bucket)

    # run application
    date_list = return_date_list(arg_date, src_format)
    etl_report1(source, target, date_list, columns, arg_date, target_key, target_format)


In [65]:
main()

In [66]:
!aws s3 ls

2021-11-03 14:33:54 destination-snowflake
2022-03-27 11:33:16 destination-xetra
2021-12-15 13:45:47 lending-club
2022-03-25 20:37:36 source-covid-19-jobs
2021-11-03 14:33:28 source-snowflake
2021-11-02 22:08:15 xetra-target


In [68]:
# Read the uploaded files
target = boto3.resource('s3').Bucket('destination-xetra')
for obj in target.objects.all():
    print(obj.key)
prq_obj = target.Object(key='xetra_daily_report_20220328_233207.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)
df_report.head()

xetra_daily_report_20220327_115215.parquet
xetra_daily_report_20220328_141724.parquet
xetra_daily_report_20220328_194558.parquet
xetra_daily_report_20220328_195337.parquet
xetra_daily_report_20220328_233207.parquet


Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,max_price_eur,daily_traded_volume,prev_closing_perc
0,AT000000STR1,2022-03-25,36.8,36.4,36.4,36.8,119,-1.65
1,AT000000STR1,2022-03-28,36.95,36.85,36.75,37.0,260,1.22
2,AT00000FACC2,2022-03-25,7.83,7.92,7.83,7.92,74,-0.76
3,AT00000FACC2,2022-03-28,8.16,8.16,8.16,8.16,50,2.94
4,AT0000606306,2022-03-25,12.39,12.53,12.32,12.77,19227,0.64
