In [1]:
#import libraries

import pandas as pd
import boto3
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [2]:
#define constants

DATE_FORMAT = '%Y-%m-%d'
SOURCE_BUCKET = 'xetra-1234'
TARGET_BUCKET = 'xetra-dataset-production'

In [3]:
#add date arguments

arg_date = '2022-01-06'
arg_date_dt = datetime.strptime(arg_date, DATE_FORMAT).date() - timedelta(days=1)

arg_date_dt

datetime.date(2022, 1, 5)

In [4]:
#create connection to s3 bucket
s3 = boto3.resource('s3')
bucket = s3.Bucket(SOURCE_BUCKET)

objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], DATE_FORMAT).date() <= arg_date_dt]

objects

[s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR00.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR01.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR02.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR03.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR04.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR05.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR06.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR07.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR08.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR09.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-01-03/2022-01-03_BINS_XETR10.csv'),
 s3.Object

In [5]:
def csv_to_df(filename):
    csv_object = bucket.Object(key=filename).get().get('Body').read().decode('utf-8')
    data = StringIO(csv_object) #StringIO stores value into binary form
    df = pd.read_csv(data)
    return df
    
df_all = pd.concat([csv_to_df(obj.key) for obj in objects], ignore_index=True)

In [6]:
#initialize --improved by the function above and use of concat initializing a df with columns
csv_object_init = bucket.Object(key='2022-01-27/2022-01-27_BINS_XETR16.csv').get().get('Body').read().decode('utf-8')
data = StringIO(csv_object_init)
df_init = pd.read_csv(data)

In [7]:
#filter columns
columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
df_all = df_all.loc[:,columns]
df_all.dropna(inplace=True) #drop records with NA

df_all

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume
0,AT0000A0E9W5,2022-01-03,08:00,14.760,14.760,14.750,14.750,4414
1,DE000A0DJ6J9,2022-01-03,08:00,37.640,37.660,37.600,37.660,1649
2,DE000A0D6554,2022-01-03,08:00,13.990,14.030,13.940,13.960,23011
3,DE000A0D9PT0,2022-01-03,08:00,180.000,180.050,179.500,179.500,2308
4,DE000A0HN5C6,2022-01-03,08:00,37.280,37.280,37.280,37.280,2897
...,...,...,...,...,...,...,...,...
349136,IE00BYZK4883,2022-01-05,16:44,8.732,8.732,8.732,8.732,50
349137,IE00B1XNHC34,2022-01-05,16:44,10.392,10.392,10.392,10.392,180
349138,US3390411052,2022-01-05,20:30,208.000,208.000,208.000,208.000,0
349139,DE000A3E5ES0,2022-01-05,20:30,43.000,43.000,43.000,43.000,0


## Get opening price per ISIN and day

In [8]:
df_all['opening_price'] = df_all.sort_values(by='Time').groupby(['ISIN', 'Date'])['StartPrice'].transform('first')

df_all[df_all['ISIN'] == 'DE000A0HN5C6']

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price
4,DE000A0HN5C6,2022-01-03,08:00,37.28,37.28,37.28,37.28,2897,37.28
161,DE000A0HN5C6,2022-01-03,08:01,37.28,37.33,37.28,37.33,1000,37.28
281,DE000A0HN5C6,2022-01-03,08:02,37.57,37.61,37.55,37.61,2259,37.28
579,DE000A0HN5C6,2022-01-03,08:03,37.69,37.73,37.65,37.65,2477,37.28
745,DE000A0HN5C6,2022-01-03,08:04,37.66,37.86,37.65,37.85,2900,37.28
...,...,...,...,...,...,...,...,...,...
344554,DE000A0HN5C6,2022-01-05,16:26,37.90,37.90,37.85,37.89,909,37.70
344831,DE000A0HN5C6,2022-01-05,16:27,37.89,37.99,37.88,37.94,3047,37.70
345091,DE000A0HN5C6,2022-01-05,16:28,37.96,37.97,37.96,37.97,300,37.70
345397,DE000A0HN5C6,2022-01-05,16:29,37.97,37.97,37.97,37.97,4,37.70


## Get closing price per ISIN and day

In [9]:
df_all['closing_price'] = df_all.sort_values(by='Time').groupby(['ISIN', 'Date'])['StartPrice'].transform('last')

df_all[df_all['ISIN'] == 'DE000A0HN5C6']

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price,closing_price
4,DE000A0HN5C6,2022-01-03,08:00,37.28,37.28,37.28,37.28,2897,37.28,37.64
161,DE000A0HN5C6,2022-01-03,08:01,37.28,37.33,37.28,37.33,1000,37.28,37.64
281,DE000A0HN5C6,2022-01-03,08:02,37.57,37.61,37.55,37.61,2259,37.28,37.64
579,DE000A0HN5C6,2022-01-03,08:03,37.69,37.73,37.65,37.65,2477,37.28,37.64
745,DE000A0HN5C6,2022-01-03,08:04,37.66,37.86,37.65,37.85,2900,37.28,37.64
...,...,...,...,...,...,...,...,...,...,...
344554,DE000A0HN5C6,2022-01-05,16:26,37.90,37.90,37.85,37.89,909,37.70,37.86
344831,DE000A0HN5C6,2022-01-05,16:27,37.89,37.99,37.88,37.94,3047,37.70,37.86
345091,DE000A0HN5C6,2022-01-05,16:28,37.96,37.97,37.96,37.97,300,37.70,37.86
345397,DE000A0HN5C6,2022-01-05,16:29,37.97,37.97,37.97,37.97,4,37.70,37.86


## Aggregations

In [10]:
df_all = df_all.groupby(['ISIN', 'Date'], as_index=False).agg(opening_price_eur=('opening_price', 'min'), 
                                                              closing_price_eur=('closing_price', 'min'), 
                                                              minimum_price_eur=('MinPrice', 'min'), 
                                                              maximum_price_eur=('MaxPrice', 'max'), 
                                                              daily_traded_volume=('TradedVolume', 'sum'))

df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume
0,AT000000STR1,2022-01-03,36.550,37.400,36.350,37.400,660
1,AT000000STR1,2022-01-04,37.750,37.850,37.750,37.850,27
2,AT000000STR1,2022-01-05,37.700,37.200,37.200,37.700,800
3,AT00000FACC2,2022-01-03,7.390,7.650,7.390,7.670,1045
4,AT00000FACC2,2022-01-04,7.860,7.790,7.790,7.950,681
...,...,...,...,...,...,...,...
9550,XS2314660700,2022-01-04,18.305,18.606,18.305,18.606,0
9551,XS2314660700,2022-01-05,18.154,18.075,18.075,18.311,0
9552,XS2376095068,2022-01-03,41.504,41.044,41.000,41.832,2
9553,XS2376095068,2022-01-04,41.186,41.360,41.186,41.360,0


## Percent Change Previous Closing

In [11]:
df_all['previous_closing_price'] = df_all.sort_values(by='Date').groupby('ISIN')['closing_price_eur'].shift(1)

df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,previous_closing_price
0,AT000000STR1,2022-01-03,36.550,37.400,36.350,37.400,660,
1,AT000000STR1,2022-01-04,37.750,37.850,37.750,37.850,27,37.400
2,AT000000STR1,2022-01-05,37.700,37.200,37.200,37.700,800,37.850
3,AT00000FACC2,2022-01-03,7.390,7.650,7.390,7.670,1045,
4,AT00000FACC2,2022-01-04,7.860,7.790,7.790,7.950,681,7.650
...,...,...,...,...,...,...,...,...
9550,XS2314660700,2022-01-04,18.305,18.606,18.305,18.606,0,17.867
9551,XS2314660700,2022-01-05,18.154,18.075,18.075,18.311,0,18.606
9552,XS2376095068,2022-01-03,41.504,41.044,41.000,41.832,2,
9553,XS2376095068,2022-01-04,41.186,41.360,41.186,41.360,0,41.044


In [12]:
df_all['change_previous_closing_%'] = (df_all['closing_price_eur'] - df_all['previous_closing_price']) / df_all['previous_closing_price'] * 100
df_all.drop(columns=['previous_closing_price'], inplace=True)
df_all = df_all.round(decimals=2)

df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_previous_closing_%
0,AT000000STR1,2022-01-03,36.55,37.40,36.35,37.40,660,
1,AT000000STR1,2022-01-04,37.75,37.85,37.75,37.85,27,1.20
2,AT000000STR1,2022-01-05,37.70,37.20,37.20,37.70,800,-1.72
3,AT00000FACC2,2022-01-03,7.39,7.65,7.39,7.67,1045,
4,AT00000FACC2,2022-01-04,7.86,7.79,7.79,7.95,681,1.83
...,...,...,...,...,...,...,...,...
9550,XS2314660700,2022-01-04,18.30,18.61,18.30,18.61,0,4.14
9551,XS2314660700,2022-01-05,18.15,18.08,18.08,18.31,0,-2.85
9552,XS2376095068,2022-01-03,41.50,41.04,41.00,41.83,2,
9553,XS2376095068,2022-01-04,41.19,41.36,41.19,41.36,0,0.77


## Output to S3

In [13]:
key = 'xetra_daily_report_before_' + arg_date_dt.strftime('%Y%m%d_%H%M%S') + '.parquet'

output_buffer = BytesIO()
df_all.to_parquet(output_buffer, index=False)
bucket_target = s3.Bucket(TARGET_BUCKET)
bucket_target.put_object(Body=output_buffer.getvalue(), Key=key)

s3.Object(bucket_name='xetra-dataset-production', key='xetra_daily_report_before_20220105_000000.parquet')

## Read output file

In [14]:
for obj in bucket_target.objects.all():
    print(obj.key)

xetra_daily_report_before_20220105_000000.parquet


In [15]:
parquet_object = bucket_target.Object(key='xetra_daily_report_before_20220105_000000.parquet').get().get('Body').read()
data = BytesIO(parquet_object)
df_report = pd.read_parquet(data)

df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_previous_closing_%
0,AT000000STR1,2022-01-03,36.55,37.40,36.35,37.40,660,
1,AT000000STR1,2022-01-04,37.75,37.85,37.75,37.85,27,1.20
2,AT000000STR1,2022-01-05,37.70,37.20,37.20,37.70,800,-1.72
3,AT00000FACC2,2022-01-03,7.39,7.65,7.39,7.67,1045,
4,AT00000FACC2,2022-01-04,7.86,7.79,7.79,7.95,681,1.83
...,...,...,...,...,...,...,...,...
9550,XS2314660700,2022-01-04,18.30,18.61,18.30,18.61,0,4.14
9551,XS2314660700,2022-01-05,18.15,18.08,18.08,18.31,0,-2.85
9552,XS2376095068,2022-01-03,41.50,41.04,41.00,41.83,2,
9553,XS2376095068,2022-01-04,41.19,41.36,41.19,41.36,0,0.77
