In [1]:
## Pick one file of the dataset from the cloud

import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

## Architecture

![image](architecture design.png)

We have 3 layers:

The layers will have classes or functions to do a specific task

1. Infrastructure layer: contains databases, roots, caches etc. Storage for the source and target is in this layer.
2. Adapter layer: It is responsible for accessing the infrastructure and external APIs
3. Application layer: it contains the features of our application

## Functions

Functional approach: we will have two layers: adapter and application layer


In [2]:
## Adapter layer

def read_csv_to_df(bucket, key, decode="utf-8", sep=","):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decode)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

def write_csv_to_s3(bucket, df, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body = out_buffer.getvalue(), Key=key)
    return True

def get_objects(bucket, arg_date, date_format):
    objects = []
    curr_date = datetime.strptime(arg_date, date_format).date()
    prev_date = (curr_date - timedelta(days=1)).strftime(date_format)
    
    for date in [arg_date, prev_date]:
        bucket_obj = bucket.objects.filter(Prefix=date)
        objects += [obj.key for obj in bucket_obj]

    return objects

In [14]:
## Application layer
 
def extract(bucket, objects):
    df_all = pd.concat([read_csv_to_df(bucket, key) for key in objects])
    return df_all

def transform_report1(df, columns, arg_date):
    df = df[columns]
    df = df.sort_values(by=["Date", "Time"]).reset_index(drop=True)
    df['opening_price'] = df.sort_values(by=["Date","Time"]).groupby(["ISIN", "Date"])["StartPrice"].transform("first").reset_index().drop("index",axis=1)
    df['closing_price'] = df.sort_values(by=["Date","Time"]).groupby(["ISIN", "Date"])["StartPrice"].transform("last").reset_index().drop("index",axis=1)
    
    ## Aggregations
    df = df.groupby(["ISIN", "Date"], as_index=False).agg(
        opening_price_eur = ("opening_price", "min"),
        closing_price_eur = ("closing_price", "min"),
        min_price_eur = ("MinPrice", "min"),
        max_price_eur = ("MaxPrice", "max"),
        daily_traded_vol = ("TradedVolume", "sum")
    )
    
    df["prev_closing_price"] = df.sort_values(by="Date").groupby("ISIN")["closing_price_eur"].shift(1)
    df["prev_closing_%"] = (df["closing_price_eur"] - df["prev_closing_price"])/df["prev_closing_price"]*100
    df = df.round(decimals=2)
    df.drop("prev_closing_price", inplace=True, axis=1)
    df = df[df["Date"]==arg_date]
    return df

def load(bucket, df, trg_key, trg_format):
    key = f"{trg_key}_{datetime.today().strftime('%Y-%m-%d')}.{trg_format}"
    write_csv_to_s3(bucket, df, key)
    return True

def ETL_report1(src_bucket, trg_bucket, objects, columns, arg_date, trg_key, trg_format):
    
    df = extract(src_bucket, objects)
    df = transform_report1(df, columns, arg_date)
    
    load(trg_bucket, df, trg_key, trg_format)
    return True

In [15]:
## Main function entry point
arg_date = "2022-02-22"

def main():
    ## Parameters/configurations
    ## Later we will read the inputs from config file
    date_format = "%Y-%m-%d"
    columns = ["ISIN", "Mnemonic", "Date", "Time", "StartPrice", "MaxPrice", "MinPrice", "EndPrice", "TradedVolume"]
    src_bucket_name = "xetra-1234"
    trg_bucket_name = "xetra-report-1"
    trg_key = "xetra_daily_report"
    trg_format="parquet"
    
    ## Init connections
    s3 = boto3.resource('s3') ## type of the resource
    src_bucket = s3.Bucket(name=src_bucket_name)  
    trg_bucket = s3.Bucket(name=trg_bucket_name)
    
    ## Run the application
    objects = get_objects(src_bucket, arg_date, date_format)
    ETL_report1(src_bucket, trg_bucket, objects, columns, arg_date, trg_key, trg_format)
    

In [16]:
##run 
main()

## Evaluation

In [17]:
s3 = boto3.resource('s3') ## type of the resource
src_bucket = s3.Bucket(name="xetra-1234")  
trg_bucket = s3.Bucket(name="xetra-report-1")

for obj in trg_bucket.objects.all():
    print(obj.key)

xetra_daily_report_2022-07-25.parquet
xetra_daily_report_2022-07-26.parquet


In [19]:
obj = trg_bucket.Object(key=f"xetra_daily_report_2022-07-26.parquet").get().get("Body").read()
data = BytesIO(obj)
df = pd.read_parquet(data)
df

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,min_price_eur,max_price_eur,daily_traded_vol,prev_closing_%
0,AT000000STR1,2022-02-22,36.40,36.85,36.40,36.85,1475,-1.60
1,AT00000FACC2,2022-02-22,7.89,8.47,7.89,8.47,1956,1.80
2,AT0000606306,2022-02-22,21.36,21.78,21.20,22.62,61546,-8.26
3,AT0000609607,2022-02-22,12.20,12.30,12.14,12.32,930,-1.28
4,AT0000644505,2022-02-22,98.40,100.80,98.40,102.20,2413,-1.56
...,...,...,...,...,...,...,...,...
3248,XS2284324667,2022-02-22,38.72,38.33,38.27,39.07,4245,0.64
3249,XS2314659447,2022-02-22,8.74,8.76,8.71,8.81,133,0.22
3250,XS2314660700,2022-02-22,22.10,22.04,22.04,22.10,239,1.60
3251,XS2376095068,2022-02-22,32.47,33.04,32.47,33.22,758,-3.36
