In [58]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import  datetime, timedelta

In [59]:
# Adapter Layer
def read_csv_from_s3(bucket, key, decoding='utf-8', sep=','):
  csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
  data = StringIO(csv_obj)
  return pd.read_csv(data, delimiter=sep)


def write_parque_to_s3(bucket, df, key):
  out_buffer = BytesIO()
  df.to_parquet(out_buffer, index=False)
  bucket.put_object(Body=out_buffer.getvalue(), Key=key)
  return True

def write_csv_s3(bucket, df, key):
  out_buffer = StringIO()
  df.to_csv(out_buffer, index=False)
  bucket.put_object(Body=out_buffer.getvalue(), Key=key)
  return True

def get_csv_files_in_prefix(bucket, prefix):
  return [obj.key for obj in bucket.objects.filter(Prefix=prefix)]

In [60]:
# Aplication Layer
def extract(bucket, dates):
  files = [key for date in dates for key in get_csv_files_in_prefix(bucket, date)]
  return pd.concat([read_csv_from_s3(bucket, name) for name in files], ignore_index=True)


def transform_as_report_1(df, columns, start_date):
  df = df[columns]
  df = df.dropna()
  df['opening_price'] = df.sort_values(by='Time').groupby(['ISIN', 'Date'])['StartPrice'].transform('first')
  df['closing_price'] = df.sort_values(by='Time').groupby(['ISIN', 'Date'])['StartPrice'].transform('last')
  df = df.groupby(['ISIN', 'Date'], as_index=False).agg(
    opening_price_eur=('opening_price', 'min'), 
    closing_price_eur=('closing_price', 'min'),
    mininum_price_eur=('MinPrice', 'min'),
    maximun_price_eur=('MaxPrice', 'max'),
    daily_traded_volume=('TradedVolume', 'sum'))
  df['prev_closing_price'] = df.sort_values(by='Date').groupby('ISIN')['closing_price_eur'].shift(1)
  df['change_prev_closing_%'] = (df['closing_price_eur'] - df['prev_closing_price']) / df['prev_closing_price'] * 100
  df = df.drop(columns='prev_closing_price')
  df = df.round(decimals=2)
  df = df[df['Date'] >= start_date]
  return df


def load(bucket, df, key, meta_key, new_processed_dates):
  write_parque_to_s3(bucket, df, key)
  update_meta_file(bucket, meta_key, new_processed_dates)
  return True


def run_etl_report_1(source_bucket, target_bucket, dates, columns, start_date, key, meta_key):
  df = extract(source_bucket, dates)
  df = transform_as_report_1(df, columns, start_date)
  new_processed_dates = [date for date in dates if date >= start_date]
  load(target_bucket, df, key, meta_key, new_processed_dates)
  return True

In [61]:
# Helper functions
def get_dates_for_processing(bucket, start_date, date_format, meta_key):
  start_date = datetime.strptime(start_date, date_format).date()
  day_previous_to_start = start_date - timedelta(days=1)
  today = datetime.today().date()
  dates = [(day_previous_to_start + timedelta(days=x)) for x in range((today - day_previous_to_start).days + 1)]
  try:
    df_meta = read_csv_from_s3(bucket, meta_key)
    processed_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)
    not_yet_processed_dates = set(dates[1:]) - processed_dates
    if not_yet_processed_dates:
      start_date = min(not_yet_processed_dates)
      day_previous_to_start = start_date - timedelta(days=1)
      new_dates = [date.strftime(date_format) for date in dates if date >= day_previous_to_start]
    else:
      start_date = datetime(2200, 1, 1).date()
      new_dates = []
  except bucket.session.client('s3').exceptions.NoSuchKey:
    new_dates = [date.strftime(date_format) for date in dates]
  return start_date.strftime(date_format), new_dates

def update_meta_file(bucket, meta_key, new_processed_dates):
  new_df = pd.DataFrame(columns=['source_date', 'processing_timestamp'])
  new_df['source_date'] = new_processed_dates
  new_df['processing_timestamp'] = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  old_df = read_csv_from_s3(bucket, meta_key)
  combined_df = pd.concat([old_df, new_df])
  write_csv_s3(bucket, combined_df, meta_key)

In [62]:
# Main function entry point
def main():
  # configurations
  start_date = '2021-12-08'
  date_format = '%Y-%m-%d'
  source_bucket_name = 'deutsche-boerse-xetra-pds'
  target_bucket_name = 'xetra-etl-tutorial'
  columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume', 'NumberOfTrades']
  key = f'xetra_daily_report_{datetime.today().strftime("%Y%m%d_%H%M%S")}.parque'
  meta_key = 'meta_file.csv'

  # init
  s3 = boto3.resource('s3')
  source_bucket = s3.Bucket(source_bucket_name)
  target_bucket = s3.Bucket(target_bucket_name)

  # run application
  start_date, dates = get_dates_for_processing(target_bucket, start_date, date_format, meta_key)
  run_etl_report_1(source_bucket, target_bucket, dates, columns, start_date, key, meta_key)


In [63]:
main()

In [64]:
target_bucket_name = 'xetra-etl-tutorial'
s3 = boto3.resource('s3')
target_bucket = s3.Bucket(target_bucket_name)

for obj in target_bucket.objects.all():
  print(obj.key)


meta_file.csv
xetra_daily_report_20211204_202308.parque
xetra_daily_report_20211204_204624.parque
xetra_daily_report_20211205_160225.parque
xetra_daily_report_20211205_174234.parque
xetra_daily_report_20211211_130016.parque
xetra_daily_report_20211211_132121.parque
xetra_daily_report_20211211_141627.parque
xetra_daily_report_20211211_142152.parque


In [65]:
parque_stream = target_bucket.Object(key='xetra_daily_report_20211211_132121.parque').get().get('Body').read()
data = BytesIO(parque_stream)
report_df = pd.read_parquet(data)

In [66]:
report_df

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,mininum_price_eur,maximun_price_eur,daily_traded_volume,change_prev_closing_%
0,AT000000STR1,2021-12-10,36.05,35.50,35.50,36.20,390,-1.80
1,AT00000FACC2,2021-12-10,7.84,7.84,7.84,7.84,0,0.51
2,AT0000606306,2021-12-10,25.12,25.44,25.12,25.44,821,1.52
3,AT0000609607,2021-12-10,13.00,12.86,12.80,13.00,523,0.00
4,AT0000644505,2021-12-10,109.40,112.00,109.20,112.40,71,2.19
...,...,...,...,...,...,...,...,...
3173,XS2265370234,2021-12-10,16.00,15.50,15.50,16.00,623,-1.74
3174,XS2284324667,2021-12-10,31.33,31.39,31.33,31.75,4072,-0.56
3175,XS2314659447,2021-12-10,8.43,8.39,8.39,8.45,1434,-0.13
3176,XS2314660700,2021-12-10,17.50,17.63,17.50,17.63,0,-0.37


In [67]:
for i in range(10):
  print(i)

0
1
2
3
4
5
6
7
8
9
