In [None]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta

In [None]:
#Argument Date
# arg_date = '2022-05-07'

In [None]:
# arg_date_dt = datetime.strptime(arg_date,'%Y-%m-%d').date() - timedelta(days=1)

In [None]:
# arg_date_dt

In [None]:
s3 = boto3.resource('s3')
bucket = s3.Bucket('xetra-1234')

## Reading Data

In [None]:
bucket_obj = bucket.objects.filter(Prefix='2022-01-28')
objects = [obj for obj in bucket_obj]

In [None]:
objects

In [None]:
csv_obj = bucket.Object(key='2022-01-28/2022-01-28_BINS_XETR13.csv').get().get('Body').read().decode('utf-8')

In [None]:
csv_obj

In [None]:
data = StringIO(csv_obj)
df = pd.read_csv(data, delimiter=',')

In [None]:
df

## Reading Multiple Files

In [None]:
#Filter twice
bucket_obj1 = bucket.objects.filter(Prefix='2022-03-15')
bucket_obj2 = bucket.objects.filter(Prefix='2022-03-16')
#Combined list comprehensions
objects = [obj for obj in bucket_obj1] + [obj for obj in bucket_obj2]

In [None]:
objects

In [None]:
csv_obj_init = bucket.Object(key=objects[0].key).get().get('Body').read().decode('utf-8')
data = StringIO(csv_obj_init)
df_init = pd.read_csv(data,delimiter=',')

In [None]:
df_init.columns

In [None]:
df_list = []
df_all = pd.DataFrame(columns=df_init.columns)
for obj in objects:
    csv_obj = bucket.Object(key=obj.key).get().get('Body').read().decode('utf-8')
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=',')
    df_list.append(df)
df_all = pd.concat(df_list) 
df_all=df_all.reset_index(drop=True)


In [None]:
df_all

In [None]:
#Selecting the columns of interest - 8 
columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
       'EndPrice', 'TradedVolume']
df_all = df_all.loc[:,columns]

In [None]:
df_all.dropna(inplace=True)

In [None]:
df_all.shape

## Transformations

### Get opening price per ISIN and day

In [None]:
df_all

In [None]:
df_all['opening_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')

In [None]:
df_all

In [None]:
df_all[df_all['ISIN']=='DE000A0LD6E6']

### Get closing price per ISIN and day

In [None]:
df_all['closing_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('last')

In [None]:
df_all[df_all['ISIN']=='DE000A0LD6E6']

### Aggregations

In [None]:
df_all = df_all.groupby(['ISIN','Date'],as_index=False).agg(opening_price_eur=('opening_price','min'),closing_price_eur=('closing_price','min'), minimum_price_eur=('MinPrice','min'), maximum_price_eur=('MaxPrice','max'), daily_traded_volume=('TradedVolume','sum'))

In [None]:
df_all

### Percent Change Prev Closing

In [None]:
df_all['prev_closing_price'] = df_all.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)

In [None]:
df_all

In [None]:
df_all['change_prev_closing_%'] = (df_all['closing_price_eur'] - df_all['prev_closing_price']) / df_all['prev_closing_price'] * 100

In [None]:
df_all.drop(columns=['prev_closing_price'],inplace=True)

In [None]:
df_all = df_all.round(decimals=2)

In [None]:
df_all

## Argument Date

In [2]:
import boto3
import pandas as pd
from io import StringIO, BytesIO 
from datetime import datetime, timedelta

In [3]:
arg_date = '2022-05-07'

In [5]:
arg_date_dt = datetime.strptime(arg_date,'%Y-%m-%d').date() - timedelta(days=1)

In [6]:
arg_date_dt

datetime.date(2022, 5, 6)

In [7]:
s3 = boto3.resource('s3')
bucket = s3.Bucket('xetra-1234')

In [8]:
bucket_obj = bucket.objects.filter(Prefix='2022-01-28')
objects = [obj for obj in bucket_obj]

In [9]:
objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], '%Y-%m-%d').date() >= arg_date_dt]

In [10]:
objects

[s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR00.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR01.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR02.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR03.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR04.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR05.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR06.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR07.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR08.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR09.csv'),
 s3.ObjectSummary(bucket_name='xetra-1234', key='2022-05-06/2022-05-04_BINS_XETR10.csv'),
 s3.Object

In [11]:
len(objects)

5697

In [12]:
csv_obj_init = bucket.Object(key=objects[0].key).get().get('Body').read().decode('utf-8')
data = StringIO(csv_obj_init)
df_init = pd.read_csv(data,delimiter=',')

In [13]:
df_init.columns

Index(['ISIN', 'Mnemonic', 'SecurityDesc', 'SecurityType', 'Currency',
       'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
       'EndPrice', 'TradedVolume', 'NumberOfTrades'],
      dtype='object')

In [14]:
df_list = []
df_all = pd.DataFrame(columns=df_init.columns)

In [15]:
import time
count = 0
for obj in objects:
#     if obj is None: #comment out this valueerror
#         raise ValueError("My object should not be None")
    csv_obj = bucket.Object(key=obj.key).get().get('Body').read().decode('utf-8')
    count = count + 1
    print(f'Count: {count}', end='\r')
#     time.sleep(1) 
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=',')
    df_list.append(df)

Count: 5697

In [16]:
df_all = pd.concat(df_list) 
df_all=df_all.reset_index(drop=True)

In [17]:
df_all

Unnamed: 0,ISIN,Mnemonic,SecurityDesc,SecurityType,Currency,SecurityID,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,NumberOfTrades
0,AT0000A0E9W5,SANT,S+T AG O.N.,Common stock,EUR,2504159,2022-05-06,08:00,16.660,16.660,16.660,16.660,915,1
1,DE000A0DJ6J9,S92,SMA SOLAR TECHNOL.AG,Common stock,EUR,2504287,2022-05-06,08:00,30.620,30.620,30.620,30.620,236,1
2,DE000A0D6554,NDX1,NORDEX SE O.N.,Common stock,EUR,2504290,2022-05-06,08:00,13.450,13.550,13.380,13.530,8870,24
3,DE000A0D9PT0,MTX,MTU AERO ENGINES NA O.N.,Common stock,EUR,2504297,2022-05-06,08:00,187.350,187.700,187.350,187.550,1857,9
4,DE000A0HN5C6,DWNI,DEUTSCHE WOHNEN SE INH,Common stock,EUR,2504314,2022-05-06,08:00,36.530,36.530,36.530,36.530,198,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
28623178,GB00BLD4ZP54,CLTC,COINSHARES DIG.SEC.OEND,ETN,EUR,6479084,2022-12-31,16:46,19.324,19.324,19.324,19.324,0,2
28623179,LU1923627332,RUSL,MUL-LYX.MSCI RUSSI.DIS.LS,ETF,EUR,5424594,2022-12-31,16:52,12.400,12.400,12.400,12.400,2645,2
28623180,US98956P1021,ZIM,ZIMMER BIOMET HLDGS DL-01,Common stock,EUR,4582018,2022-12-31,20:30,113.100,113.100,113.100,113.100,0,1
28623181,US9224171002,VEO,"VEECO INSTRUMENTS DL-,01",Common stock,EUR,6198311,2022-12-31,20:30,24.600,24.600,24.600,24.600,0,1


In [18]:
#Selecting the columns of interest - 8 
columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
       'EndPrice', 'TradedVolume']
df_all = df_all.loc[:,columns]

In [19]:
df_all.dropna(inplace=True)

In [20]:
df_all.shape

(28623183, 8)

In [21]:
## Transformations
### Get opening price per ISIN and day
df_all

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume
0,AT0000A0E9W5,2022-05-06,08:00,16.660,16.660,16.660,16.660,915
1,DE000A0DJ6J9,2022-05-06,08:00,30.620,30.620,30.620,30.620,236
2,DE000A0D6554,2022-05-06,08:00,13.450,13.550,13.380,13.530,8870
3,DE000A0D9PT0,2022-05-06,08:00,187.350,187.700,187.350,187.550,1857
4,DE000A0HN5C6,2022-05-06,08:00,36.530,36.530,36.530,36.530,198
...,...,...,...,...,...,...,...,...
28623178,GB00BLD4ZP54,2022-12-31,16:46,19.324,19.324,19.324,19.324,0
28623179,LU1923627332,2022-12-31,16:52,12.400,12.400,12.400,12.400,2645
28623180,US98956P1021,2022-12-31,20:30,113.100,113.100,113.100,113.100,0
28623181,US9224171002,2022-12-31,20:30,24.600,24.600,24.600,24.600,0


In [22]:
df_all['opening_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')

In [23]:
df_all[df_all['ISIN']=='DE000A0LD6E6']

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price
7,DE000A0LD6E6,2022-05-06,08:00,79.50,79.50,79.40,79.50,766,79.5
164,DE000A0LD6E6,2022-05-06,08:01,79.40,79.40,79.40,79.40,25,79.5
500,DE000A0LD6E6,2022-05-06,08:03,79.50,79.65,79.50,79.65,492,79.5
2913,DE000A0LD6E6,2022-05-06,08:05,79.45,79.45,79.45,79.45,160,79.5
3237,DE000A0LD6E6,2022-05-06,08:06,79.50,79.50,79.45,79.45,75,79.5
...,...,...,...,...,...,...,...,...,...
28618377,DE000A0LD6E6,2022-12-31,16:26,64.55,64.60,64.50,64.50,1967,65.0
28618744,DE000A0LD6E6,2022-12-31,16:27,64.50,64.50,64.50,64.50,128,65.0
28619053,DE000A0LD6E6,2022-12-31,16:28,64.45,64.55,64.45,64.55,485,65.0
28619405,DE000A0LD6E6,2022-12-31,16:29,64.50,64.50,64.50,64.50,85,65.0


In [24]:
### Get closing price per ISIN and day
df_all['closing_price'] = df_all.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('last')

In [25]:
df_all[df_all['ISIN']=='DE000A0LD6E6']

Unnamed: 0,ISIN,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,opening_price,closing_price
7,DE000A0LD6E6,2022-05-06,08:00,79.50,79.50,79.40,79.50,766,79.5,79.0
164,DE000A0LD6E6,2022-05-06,08:01,79.40,79.40,79.40,79.40,25,79.5,79.0
500,DE000A0LD6E6,2022-05-06,08:03,79.50,79.65,79.50,79.65,492,79.5,79.0
2913,DE000A0LD6E6,2022-05-06,08:05,79.45,79.45,79.45,79.45,160,79.5,79.0
3237,DE000A0LD6E6,2022-05-06,08:06,79.50,79.50,79.45,79.45,75,79.5,79.0
...,...,...,...,...,...,...,...,...,...,...
28618377,DE000A0LD6E6,2022-12-31,16:26,64.55,64.60,64.50,64.50,1967,65.0,64.5
28618744,DE000A0LD6E6,2022-12-31,16:27,64.50,64.50,64.50,64.50,128,65.0,64.5
28619053,DE000A0LD6E6,2022-12-31,16:28,64.45,64.55,64.45,64.55,485,65.0,64.5
28619405,DE000A0LD6E6,2022-12-31,16:29,64.50,64.50,64.50,64.50,85,65.0,64.5


In [26]:
### Aggregations
df_all = df_all.groupby(['ISIN','Date'],as_index=False).agg(opening_price_eur=('opening_price','min'),closing_price_eur=('closing_price','min'), minimum_price_eur=('MinPrice','min'), maximum_price_eur=('MaxPrice','max'), daily_traded_volume=('TradedVolume','sum'))

In [27]:
### Percent Change Prev Closing
df_all['prev_closing_price'] = df_all.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)

In [28]:
df_all['change_prev_closing_%'] = (df_all['closing_price_eur'] - df_all['prev_closing_price']) / df_all['prev_closing_price'] * 100

In [29]:
df_all.drop(columns=['prev_closing_price'],inplace=True)

In [30]:
df_all = df_all.round(decimals=2)

In [31]:
#Filtering by date
df_all =df_all[df_all.Date>=arg_date]

In [32]:
df_all

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
1,AT000000STR1,2022-05-07,38.65,38.25,38.25,38.65,410,-1.67
2,AT000000STR1,2022-05-08,38.75,38.75,38.50,38.75,251,1.31
3,AT000000STR1,2022-05-09,39.00,38.60,38.50,39.05,905,-0.39
4,AT000000STR1,2022-05-10,38.95,38.95,38.95,39.45,147,0.91
5,AT000000STR1,2022-05-11,39.15,39.25,38.65,39.60,914,0.77
...,...,...,...,...,...,...,...,...
768136,XS2434891219,2022-12-27,3.44,3.50,3.44,3.50,0,0.00
768137,XS2434891219,2022-12-28,3.44,3.66,3.42,3.66,0,4.53
768138,XS2434891219,2022-12-29,3.44,3.66,3.42,3.66,0,0.00
768139,XS2434891219,2022-12-30,3.44,3.66,3.42,3.66,0,0.00


## Saving to S3

In [33]:
#You need to create a S3 bucket in AWS with a unique name
key = 'xetra_daily_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'


In [34]:
out_buffer = BytesIO()
df_all.to_parquet(out_buffer,index=False)
bucket_target = s3.Bucket('xetra-data-etl-destination') #own bucket name
bucket_target.put_object(Body=out_buffer.getvalue(),Key=key)

s3.Object(bucket_name='xetra-data-etl-destination', key='xetra_daily_report_20230422_151559.parquet')

### Reading the uploaded file

In [35]:
for obj in bucket_target.objects.all():
    print(obj.key)

xetra_daily_report_20230422_151559.parquet


In [37]:
prq_obj = bucket_target.Object(key='xetra_daily_report_20230422_151559.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

In [38]:
df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT000000STR1,2022-05-07,38.65,38.25,38.25,38.65,410,-1.67
1,AT000000STR1,2022-05-08,38.75,38.75,38.50,38.75,251,1.31
2,AT000000STR1,2022-05-09,39.00,38.60,38.50,39.05,905,-0.39
3,AT000000STR1,2022-05-10,38.95,38.95,38.95,39.45,147,0.91
4,AT000000STR1,2022-05-11,39.15,39.25,38.65,39.60,914,0.77
...,...,...,...,...,...,...,...,...
764951,XS2434891219,2022-12-27,3.44,3.50,3.44,3.50,0,0.00
764952,XS2434891219,2022-12-28,3.44,3.66,3.42,3.66,0,4.53
764953,XS2434891219,2022-12-29,3.44,3.66,3.42,3.66,0,0.00
764954,XS2434891219,2022-12-30,3.44,3.66,3.42,3.66,0,0.00
