# Daily Reporting Functions for Deutsche Boerse Xetra Dataset

A daily summary report is generated by extracting CSV data from the Deutsche Boerse public S3 bucket. The data is then wrangled, aggregated, and stored in a separate bucket in [Apache parque format](https://parquet.apache.org/).

Note: Data from the previous day is also used to ensure a significant volume of data is collected. This is in case it's the weekend or a holiday, during which the markets are closed.

---

## Imports

In [1]:
import boto3
import pandas as pd
from datetime import datetime, timedelta
from io import StringIO, BytesIO

## ETL Functions

### Adapter Layer

In [2]:
def read_csv_to_df(bucket, key, decoding='utf-8', sep=','):
    """Reads data from a csv object to a Pandas dataframe."""
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)

    return df


def write_df_to_s3(bucket, key, df, format='csv'):
    """Writes dataframe to a target S3 bucket."""

    out_buffer = BytesIO()

    if format == 'csv':
        df.to_csv(out_buffer, index=False)

    elif format == 'parquet':
        df.to_parquet(out_buffer, index=False)

    else:
        print(f"Error: {format} is not a valid format. It should be 'csv' or 'parquet.'")
        return False

    bucket.put_object(Body=out_buffer.getvalue(), Key=key)

    return True


def list_files_in_prefix(bucket, prefix):
    """Generates a list of csv files for the given prefix."""
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]

    return files

### Application Layer

In [3]:
def extract(bucket, date_list):
    """Extracts data from the Deutsche Boerse S3 bucket."""

    # Uses the list_files_in_prefix method to get all CSV files 
# loaded to the bucket since the specified date.
    files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]

    try:
        df = pd.concat([read_csv_to_df(bucket, obj) for obj in files], ignore_index=True)

    except ValueError:
        # Instantiate empty dataframe with the expected columns
        df =  pd.DataFrame(
            columns=[
                'ISIN', 'Date', 'Time', 'StartPrice',
                'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume'
            ]
        )

    return df


def transform(df, columns, arg_date):
    """Transforms the Xetra data into a form suitable for reporting."""

    df = df.loc[:, columns]
    df.dropna(inplace=True)

    # The opening_price column is created by sorting the data by Time,
    # then grouping it by ISIN and Date, selecting StartPrice,
    # and finally transforming the column to contain only the first date
    df['opening_price'] = df.sort_values(
        by=['Time']).groupby(
        ['ISIN', 'Date'])['StartPrice'].transform('first')

    # The closing_price column is created by transforming the data
    # similarly to opening_price, but selecting for the last date instead
    df['closing_price'] = df.sort_values(
        by=['Time']).groupby(
        ['ISIN', 'Date'])['StartPrice'].transform('last')

    # The dataframe is grouped by ISIN and Date, then aggregated to create
    # new columns to express opening, closing, min, max, and trade volume
    df = df.groupby(['ISIN', 'Date'], as_index=False).agg(
        opening_price_eur=('opening_price', 'min'),
        closing_price_eur=('closing_price', 'min'),
        minimum_price_eur=('MinPrice', 'min'),
        maximum_price_eur=('MaxPrice', 'max'),
        daily_traded_volume=('TradedVolume', 'sum'))

    # The prev_closing_price column is created
    # by sorting the data by Date, and then grouping it by ISIN
    # and selecting for closing_price_eur of the previous date
    df['prev_closing_price'] = df.sort_values(
        by=['Date']).groupby(
        ['ISIN'])['opening_price_eur'].shift(1)

    # The change_prev_closing_percent column is created
    # by subtracting the current and prev closing prices
    # and dividing the result by the prev price times 100. This results in
    # the percentage of change in the closing price since the last date
    df['change_prev_closing_%'] = (
        df['opening_price_eur'] - df['prev_closing_price']
        ) / df['prev_closing_price'] * 100

    df.drop(columns=['prev_closing_price'], inplace=True)
    df = df.round(decimals=2)
    df = df[df.Date >= arg_date]

    return df

def load(bucket, df, tgt_key, tgt_format, meta_key, extract_date_list):
    """Loads the data into a new S3 bucket for reporting."""

    key = f"{tgt_key}_{datetime.today().strftime('%Y%m%d_%H%M%S')}.{tgt_format}"

    if len(df) < 1:
        print("Sorry, no data was extracted. Try another date.")
        return False

    write_df_to_s3(bucket, key, df, format=tgt_format)
    update_meta_file(bucket, meta_key, extract_date_list)

    return True


def report(src_bucket, tgt_bucket, date_list, columns, arg_date, tgt_key, tgt_format, meta_key):
    """Processes Xetra source data through ETL into a report."""

    df = extract(src_bucket, date_list)
    df = transform(df, columns, arg_date)
    extract_date_list = [date for date in date_list if date >= arg_date]
    load(tgt_bucket, df, tgt_key, tgt_format, meta_key, extract_date_list)

    return True
    

In [4]:
def get_date_list(bucket, arg_date, date_format, meta_key):
    """Returns a list of possible dates based on the argument date."""

    # Set the minimum date to the previous day (in datetime format)
    min_date = datetime.strptime(arg_date, date_format).date() - timedelta(days=1)
    today = datetime.today().date()

    try:
        # Read the meta file in the target S3 bucket
        df_meta = read_csv_to_df(bucket, meta_key)

        # The date list counts up from min_date to the current date
        dates = [(min_date + timedelta(days=x)) for x in range(0, (today-min_date).days + 1)]

        # Create set of unique dates in the meta file
        # and convert them to pandas datetime objects
        src_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)

        # Finds list of dates not yet added to the meta file
        missing_dates = set(dates[1:]) - src_dates

        if missing_dates:
            # Set min_date to the day before first date not in the meta file
            min_date = min(set(dates[1:]) - src_dates) - timedelta(days=1)
            date_results = [date.strftime(date_format) for date in dates if date >= min_date]
            min_date_result = (min_date + timedelta(days=1)).strftime(date_format)

        else:
            date_results = []
            min_date_result = datetime(2500, 1, 1).date()

    except bucket.session.Session().client('s3').exceptions.NoSuchKey:
        date_results = [(min_date + timedelta(days=x)).strftime(date_format) for x in range(0, (today-min_date).days + 1)]
        min_date_result = arg_date

    return min_date_result, date_results


def update_meta_file(bucket, meta_key, extract_date_list):
    """Updates the meta file with the new dates from the latest report."""

    df_new = pd.DataFrame(columns=['source_date', 'datetime_of_processing'])
    df_new['source_date'] = extract_date_list
    df_new['datetime_of_processing'] = datetime.today().strftime('%Y-%m-%d')
    df_old = read_csv_to_df(bucket, meta_key)
    df_all = pd.concat([df_old, df_new])

    write_df_to_s3(bucket, meta_key, df_all)


## Driver


In [5]:
def main():

    # Use random date to run the application
    date_format = '%Y-%m-%d'
    arg_date = '2022-05-12'

    # Define bucket and meta values
    src_bucket_name = 'deutsche-boerse-xetra-pds'
    tgt_bucket_name = 'xetra-data-jt'
    tgt_key = 'xetra_daily_report'
    tgt_format = 'parquet'
    meta_key = 'meta.csv'

    # Columns included in the report
    columns = ['ISIN', 'Date', 'Time', 'StartPrice',
        'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']

    # Initialize the S3 buckets
    s3 = boto3.resource('s3')
    src_bucket = s3.Bucket(src_bucket_name)
    tgt_bucket = s3.Bucket(tgt_bucket_name)

    # Run the application
    extract_date, date_list = get_date_list(
        tgt_bucket, arg_date, date_format, meta_key
    )

    report(src_bucket, tgt_bucket, date_list, columns,
        extract_date, tgt_key, tgt_format, meta_key)


In [6]:
# Uncomment the line below to run the ETL job
# and generate the most recent daily report

#main()

---


## Display Most Recent Report

In [7]:
bucket_name = 'xetra-data-jt'
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
key_list = [obj.key for obj in bucket.objects.all()]

print(f"There are {len(key_list) - 1} reports in the bucket")
list(key_list)

There are 5 reports in the bucket


['meta.csv',
 'xetra_daily_report_20220510_205428.parquet',
 'xetra_daily_report_20220511_075427.parquet',
 'xetra_daily_report_20220516_194048.parquet',
 'xetra_daily_report_20220516_194335.parquet',
 'xetra_daily_report_20220516_194528.parquet']

In [8]:
# Get the most recent daily report
key = key_list[-1]
prq_obj = bucket.Object(key=key).get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

display(df_report)

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT000000STR1,2022-05-12,36.30,35.80,35.80,36.30,295,-1.38
1,AT000000STR1,2022-05-13,36.75,39.10,36.75,39.30,1307,9.22
2,AT00000FACC2,2022-05-12,7.42,7.35,7.22,7.46,3400,-2.26
3,AT0000606306,2022-05-12,11.22,11.84,11.22,12.09,39371,2.25
4,AT0000606306,2022-05-13,12.22,12.11,11.90,12.34,40887,2.28
...,...,...,...,...,...,...,...,...
6613,XS2427474023,2022-05-13,19.10,18.51,18.51,19.10,0,-5.86
6614,XS2434891219,2022-05-12,2.76,2.80,2.58,2.80,9968,-4.63
6615,XS2434891219,2022-05-13,2.90,2.91,2.90,2.97,33613,3.96
6616,XS2437455608,2022-05-12,28.86,27.77,27.77,28.93,0,1.39
