In [1]:
import pandas as pd
from io import StringIO, BytesIO
import boto3
from datetime import datetime, timedelta

In [2]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
# Adapter Layer

def read_csv_to_df(bucket, key, decoding='utf-8', sep=','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

def write_df_to_s3(bucket, df, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

def return_objects(bucket, arg_date):
    min_date = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    objects = [obj.key for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], src_format).date() >= min_date]
    return objects

In [None]:
# Application Layer

def extract(bucket, objects):
    df = pd.concat([read_csv_to_df(bucket, obj) for obj in objects], ignore_index=True)
    return df

def transform_report1(df, columns, arg_date):
    df = df.loc[:, columns]
    df.dropna(inplace=True)
    # Get opening perice per ISIN and day
    df['opening_price_eur'] = df.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')    
    # Get closing_price_eur per ISIN and day
    df['closing_price_eur'] = df.sort_values(by=['Time']).groupby(['ISIN','Date'])['EndPrice'].transform('last')    
    # Aggregations
    df = df.groupby(['ISIN','Date'], as_index=False).agg(opening_price_eur=('opening_price_eur', min), closing_price_eur=('closing_price_eur', min),
                                                minimum_price_eur=('MinPrice', min), maximum_price_eur=('MaxPrice', max),
                                                daily_traded_volume=('TradedVolume', sum))
    # Get previous day price
    df['previous_closing_price'] = df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
    df = df[df_all.Date==arg_date]
    # Change with respect to previous day closing price
    df['change_prev_closing_%'] = df['closing_price_eur'] - df['previous_closing_price'] / df['previous_closing_price'] * 100
    df.drop(columns=['previous_closing_price'], inplace=True)
    df = df.round(decimals=2)
    return df

def load(bucket, df, trg_key, trg_format):
    key = trg_key + datetime.today().strftime('%Y%m%d_%H%M%S') + trg_format
    write_df_to_s3(bucket, df, key)
    return True

def etl_report1(src_bucket, objects, trg_bucket, columns, arg_date):
    df = extract(src_bucket, objects)
    transform_report1(df, columns, arg_date)
    load(trg_bucket, df, trg_key, trg_format)
    return True

In [3]:
arg_date = '2022-12-31'
src_format = '%Y-%m-%d'
src_bucket = 'xetra-1234'
trg_bucket = 'udemy-xetra-1234'
columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
trg_key = 'xetra_daily_report_'
trg_format = '.parquet'

In [8]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(src_bucket)


## Reporting Fields
 - Date
 - opening_price_eur
 - closing_price_eur
 - minimum_price_eur
 - maximum_price_eur
 - daily_traded_volume
 - change_prev_closing_%

## Write to S3

In [14]:
bucket_target = s3.Bucket(trg_bucket)


s3.Object(bucket_name='udemy-xetra-1234', key='xetra_daily_report_20241129_093617.parquet')

## Read the uploaded file from S3

In [15]:
for obj in bucket_target.objects.all():
    print(obj.key)

xetra_daily_report_20241127_232202.parquet
xetra_daily_report_20241128_175242.parquet
xetra_daily_report_20241128_185532.parquet
xetra_daily_report_20241129_093617.parquet


In [16]:
parque_obj = bucket_target.Object(key='xetra_daily_report_20241128_175242.parquet').get().get('Body').read()
data = BytesIO(parque_obj)
df_report = pd.read_parquet(data)
df_report.head()

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT000000STR1,2022-12-31,36.6,36.7,35.75,36.7,1773,-63.3
1,AT00000FACC2,2022-12-31,8.05,8.57,7.87,8.57,10205,-91.43
2,AT0000606306,2022-12-31,14.51,15.0,13.65,15.28,107836,-85.0
3,AT0000609607,2022-12-31,11.74,12.06,11.7,12.06,1065,-87.94
4,AT0000644505,2022-12-31,98.2,99.2,96.1,99.2,531,-0.8
