### Reading multiple files from XETRA Dataset as Pandas dataframe

In [31]:
import boto3
import pandas as pd
# To save the file as parquet format in S3, we use BytesIO
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [32]:
## Now using Date as Input argument
arg_date = '2021-05-07'
src_format = '%Y-%m-%d'
src_bucket = 'deutsche-boerse-xetra-pds'
tgt_bucket = 'xetra-quickanddirtysolution1234'
columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
key = 'xetra_daily_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'

In [33]:
arg_date_dt = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)

In [34]:
arg_date_dt

datetime.date(2021, 5, 6)

In [35]:
# Create boto3 resource for amazon s3 bucket 
s3 = boto3.resource('s3')
bucket = s3.Bucket(src_bucket)
# Getting a list of all files for a particular date eg. March 15 2021 (all 24 hours files)
objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], src_format).date() >= arg_date_dt
          and datetime.strptime(obj.key.split('/')[0], src_format).date() <= datetime.strptime(arg_date, src_format).date()]
objects

[s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR00.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR01.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR02.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR03.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR04.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR05.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR06.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR07.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pds', key='2021-05-06/2021-05-06_BINS_XETR08.csv'),
 s3.ObjectSummary(bucket_name='deutsche-boerse-xetra-pd

In [37]:
# Defining a function to read the objects
def csv_to_df(filename):
    csv_obj = bucket.Object(key=filename).get().get('Body').read().decode('utf-8')
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=',')
    return df
df_all = pd.concat([csv_to_df(obj.key) for obj in objects], ignore_index=True)

In [38]:
df_all

Unnamed: 0,ISIN,Mnemonic,SecurityDesc,SecurityType,Currency,SecurityID,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,NumberOfTrades
0,AT0000A0E9W5,SANT,S+T AG O.N.,Common stock,EUR,2504159,2021-05-06,07:00,21.52,21.9,21.52,21.82,2648,10
1,DE000A0DJ6J9,S92,SMA SOLAR TECHNOL.AG,Common stock,EUR,2504287,2021-05-06,07:00,46.02,46.02,45.82,45.84,1766,10
2,DE000A0D6554,NDX1,NORDEX SE O.N.,Common stock,EUR,2504290,2021-05-06,07:00,21.56,21.78,21.56,21.78,14935,14
3,DE000A0D9PT0,MTX,MTU AERO ENGINES NA O.N.,Common stock,EUR,2504297,2021-05-06,07:00,203.6,203.6,202.5,202.5,1246,9
4,DE000A0HN5C6,DWNI,DEUTSCHE WOHNEN SE INH,Common stock,EUR,2504314,2021-05-06,07:00,44.18,44.21,44.04,44.08,12002,31
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
212153,DE0006062144,1COV,COVESTRO AG O.N.,Common stock,EUR,2505008,2021-05-07,15:43,58.02,58.02,58.02,58.02,51,1
212154,DE000A2TSQH7,KTEK,KATEK SE INH O.N.,Common stock,EUR,6366023,2021-05-07,15:43,26.5,26.5,26.5,26.5,437,1
212155,DE0007236101,SIE,SIEMENS AG NA O.N.,Common stock,EUR,2505088,2021-05-07,15:44,144.28,144.28,144.28,144.28,3969,1
212156,DE000A0TGJ55,VAR1,VARTA AG O.N.,Common stock,EUR,2721826,2021-05-07,15:44,115.75,115.75,115.75,115.75,795,1


In [39]:
# Using only those columns we are interested in
df_all = df_all.loc[:, columns]
df_all.dropna(inplace=True)

### Get opening price per ISIN and Day

In [40]:
df_all['opening_price'] = df_all.sort_values(by = ['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')

### Get Closing Price per ISIN and Day

In [41]:
df_all['closing_price'] = df_all.sort_values(by = ['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')
df_all

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price,closing_price
0,AT0000A0E9W5,2021-05-06,07:00,21.52,21.9,21.52,21.82,2648,21.52,21.5
1,DE000A0DJ6J9,2021-05-06,07:00,46.02,46.02,45.82,45.84,1766,46.02,44.08
2,DE000A0D6554,2021-05-06,07:00,21.56,21.78,21.56,21.78,14935,21.56,20.1
3,DE000A0D9PT0,2021-05-06,07:00,203.6,203.6,202.5,202.5,1246,203.6,206.2
4,DE000A0HN5C6,2021-05-06,07:00,44.18,44.21,44.04,44.08,12002,44.18,43.14
...,...,...,...,...,...,...,...,...,...,...
212153,DE0006062144,2021-05-07,15:43,58.02,58.02,58.02,58.02,51,59.22,58.02
212154,DE000A2TSQH7,2021-05-07,15:43,26.5,26.5,26.5,26.5,437,27.2,26.5
212155,DE0007236101,2021-05-07,15:44,144.28,144.28,144.28,144.28,3969,140.68,144.28
212156,DE000A0TGJ55,2021-05-07,15:44,115.75,115.75,115.75,115.75,795,113.55,115.75


### Aggregations

In [42]:
df_all = df_all.groupby(['ISIN','Date'], as_index = False).agg(opening_price_eur=('opening_price', 'min'), 
                                                               closing_price_eur=('closing_price','min'), 
                                                               minimum_price_eur=('MinPrice','min'), 
                                                               maximum_price_eur=('MaxPrice','max'), 
                                                               daily_traded_volume=('TradedVolume','sum'))
df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume
0,AT00000FACC2,2021-05-06,8.880,8.730,8.730,8.880,943
1,AT00000FACC2,2021-05-07,8.860,9.000,8.840,9.000,547
2,AT0000606306,2021-05-06,18.500,18.300,18.300,18.540,9273
3,AT0000606306,2021-05-07,18.650,18.740,18.600,18.740,1283
4,AT0000609607,2021-05-06,16.340,16.340,16.340,16.340,0
...,...,...,...,...,...,...,...
5965,XS2265369731,2021-05-07,10.378,10.344,10.344,10.439,0
5966,XS2265370234,2021-05-06,24.892,24.446,24.222,24.892,1250
5967,XS2265370234,2021-05-07,24.606,24.108,23.766,24.606,1848
5968,XS2284324667,2021-05-06,26.744,26.686,26.586,26.744,2721


### Percent Change Prev Closing

In [43]:
# we get closing price from last day
df_all['prev_closing_price'] = df_all.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)

In [44]:
df_all['change_prev_closing_%'] = (df_all['closing_price_eur'] - df_all['prev_closing_price']) / df_all['prev_closing_price'] * 100

In [45]:
df_all.drop(columns=['prev_closing_price'], inplace=True)

In [46]:
df_all = df_all.round(decimals=2)

In [47]:
# Only latest data
df_all = df_all[df_all.Date >= arg_date]

In [48]:
df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
1,AT00000FACC2,2021-05-07,8.86,9.00,8.84,9.00,547,3.09
3,AT0000606306,2021-05-07,18.65,18.74,18.60,18.74,1283,2.40
5,AT0000609607,2021-05-07,16.42,16.62,16.42,16.70,95,1.71
7,AT0000644505,2021-05-07,114.40,111.60,111.00,114.40,928,-2.45
9,AT0000652011,2021-05-07,30.93,31.23,30.70,31.23,4463,0.35
...,...,...,...,...,...,...,...,...
5961,XS2265368097,2021-05-07,15.10,15.10,15.08,15.10,0,-0.44
5963,XS2265369574,2021-05-07,22.68,22.57,22.57,22.68,22,-1.03
5965,XS2265369731,2021-05-07,10.38,10.34,10.34,10.44,0,-0.65
5967,XS2265370234,2021-05-07,24.61,24.11,23.77,24.61,1848,-1.38


### Write to S3 Bucket

In [50]:
# This code will create the parquet file in s3 bucket
out_buffer = BytesIO()
df_all.to_parquet(out_buffer, index=False)
bucket_target = s3.Bucket(tgt_bucket)
bucket_target.put_object(Body=out_buffer.getvalue(), Key=key)

s3.Object(bucket_name='xetra-quickanddirtysolution1234', key='xetra_daily_report_20220613_160951.parquet')

### Reading the uploaded file

In [51]:
for obj in bucket_target.objects.all():
    print(obj.key)

xetra_daily_report_20220613_160951.parquet


In [52]:
par_obj = bucket_target.Object(key=obj.key).get().get('Body').read()
data = BytesIO(par_obj)
df_report = pd.read_parquet(data)

In [53]:
df_report
# This is same as df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT00000FACC2,2021-05-07,8.86,9.00,8.84,9.00,547,3.09
1,AT0000606306,2021-05-07,18.65,18.74,18.60,18.74,1283,2.40
2,AT0000609607,2021-05-07,16.42,16.62,16.42,16.70,95,1.71
3,AT0000644505,2021-05-07,114.40,111.60,111.00,114.40,928,-2.45
4,AT0000652011,2021-05-07,30.93,31.23,30.70,31.23,4463,0.35
...,...,...,...,...,...,...,...,...
2986,XS2265368097,2021-05-07,15.10,15.10,15.08,15.10,0,-0.44
2987,XS2265369574,2021-05-07,22.68,22.57,22.57,22.68,22,-1.03
2988,XS2265369731,2021-05-07,10.38,10.34,10.34,10.44,0,-0.65
2989,XS2265370234,2021-05-07,24.61,24.11,23.77,24.61,1848,-1.38


In [54]:
df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
1,AT00000FACC2,2021-05-07,8.86,9.00,8.84,9.00,547,3.09
3,AT0000606306,2021-05-07,18.65,18.74,18.60,18.74,1283,2.40
5,AT0000609607,2021-05-07,16.42,16.62,16.42,16.70,95,1.71
7,AT0000644505,2021-05-07,114.40,111.60,111.00,114.40,928,-2.45
9,AT0000652011,2021-05-07,30.93,31.23,30.70,31.23,4463,0.35
...,...,...,...,...,...,...,...,...
5961,XS2265368097,2021-05-07,15.10,15.10,15.08,15.10,0,-0.44
5963,XS2265369574,2021-05-07,22.68,22.57,22.57,22.68,22,-1.03
5965,XS2265369731,2021-05-07,10.38,10.34,10.34,10.44,0,-0.65
5967,XS2265370234,2021-05-07,24.61,24.11,23.77,24.61,1848,-1.38
