In [1]:
import io
import boto3
import polars as pl
import datetime

Job Parameters

In [2]:
# day from where you want to start the report
start_date = datetime.date(year=2022, month=1, day=28)

# end date of the report
#   -> write `datetime.now()` if you want today's date as end date
#   -> for exactly one day of output, enter same date as in `start_date` parameter
end_date = datetime.date(year=2022, month=1, day=31)

# fully-qualified name of S3 target bucket for transformed files
bucket_tgt_data = "xetra-project-transformed"

List S3 Data

In [3]:
# instantiate s3 client
s3_client = boto3.client("s3")

# get all file keys for data retrieval
listing_response: list[dict] = s3_client.list_objects_v2(Bucket="xetra-1234")

# extract one day from start date. Needed for computing 1d percent change to previous day
start_date_min_1 = start_date - datetime.timedelta(days=1)

# list all file keys from `start date - 1` until `end date`
file_keys = [
    elem["Key"] for elem in listing_response["Contents"] 
    if (elem["Key"].split("/")[0] >= start_date_min_1.isoformat())  # start date as string
    and (elem["Key"].split("/")[0] <= end_date.isoformat())  # end date as string
]

file_keys

['2022-01-27/2022-01-27_BINS_XETR00.csv',
 '2022-01-27/2022-01-27_BINS_XETR01.csv',
 '2022-01-27/2022-01-27_BINS_XETR02.csv',
 '2022-01-27/2022-01-27_BINS_XETR03.csv',
 '2022-01-27/2022-01-27_BINS_XETR04.csv',
 '2022-01-27/2022-01-27_BINS_XETR05.csv',
 '2022-01-27/2022-01-27_BINS_XETR06.csv',
 '2022-01-27/2022-01-27_BINS_XETR07.csv',
 '2022-01-27/2022-01-27_BINS_XETR08.csv',
 '2022-01-27/2022-01-27_BINS_XETR09.csv',
 '2022-01-27/2022-01-27_BINS_XETR10.csv',
 '2022-01-27/2022-01-27_BINS_XETR11.csv',
 '2022-01-27/2022-01-27_BINS_XETR12.csv',
 '2022-01-27/2022-01-27_BINS_XETR13.csv',
 '2022-01-27/2022-01-27_BINS_XETR14.csv',
 '2022-01-27/2022-01-27_BINS_XETR15.csv',
 '2022-01-27/2022-01-27_BINS_XETR16.csv',
 '2022-01-27/2022-01-27_BINS_XETR17.csv',
 '2022-01-27/2022-01-27_BINS_XETR18.csv',
 '2022-01-27/2022-01-27_BINS_XETR19.csv',
 '2022-01-27/2022-01-27_BINS_XETR20.csv',
 '2022-01-27/2022-01-27_BINS_XETR21.csv',
 '2022-01-27/2022-01-27_BINS_XETR22.csv',
 '2022-01-27/2022-01-27_BINS_XETR2

Fetch S3 Data

In [4]:
# list of all csv string contents, one elem is the content of one csv file
file_like_csvs = []

# get file content
for key in file_keys:
    
    # get http response
    get_objects_response: dict = s3_client.get_object(Bucket="xetra-1234", Key=key)
    
    # read bytes stream content
    bytes_content = get_objects_response.get("Body").read()
    
    # convert bytes stream to string
    string_content: str = bytes_content.decode("UTF-8")
    
    # convert file content to file-like object for polars df creation
    # alternative would be to save CSV locally, leading to extra IO processing time
    file_like_csv = io.StringIO(string_content)

    # return to start of stream
    file_like_csv.seek(0)

    # add to list of all contents
    file_like_csvs.append(file_like_csv)

Create Polars DataFrame

In [5]:
# list of created dfs 
df_list = []

# read content of each file into separate df
for single_csv_content in file_like_csvs:
    
    df = pl.read_csv(single_csv_content, try_parse_dates=True)
    
    # some files have no content, those shall be skipped
    if df.height > 0:
        df_list.append(df)

# merge all dfs from each file into one df
df_raw = pl.concat(df_list)

Transform Data

In [6]:
# only the columns needed for our final report
df_less_cols = df_raw.select(['ISIN','SecurityDesc','Date','Time','StartPrice','MaxPrice','MinPrice','EndPrice','TradedVolume'])

# cast 'Time' field from 'str' to 'time' data type
df_convert_time_dtype = df_less_cols.with_columns(pl.col("Time").str.strptime(pl.Time, "%H:%M"))

# get opening price for ISIN per day
df_open_per_isin_n_day = df_convert_time_dtype.with_columns(pl.col("StartPrice").first().over(["ISIN", "Date"]).alias("opening_price_1d"))

# get closing price for ISIN per day
df_close_per_isin_n_day = df_open_per_isin_n_day.with_columns(pl.col("EndPrice").first().over(["ISIN", "Date"]).alias("closing_price_1d"))

# remove redundant columns
df_dropped_cols = df_close_per_isin_n_day.drop(["StartPrice", "EndPrice", "Time"])

# aggregate the windowed df to one record per ISIN per day
df_agg_by_isin_n_day = (df_dropped_cols.groupby(["ISIN", "SecurityDesc", "Date"])
                                      .agg([
                                        pl.first("opening_price_1d").alias("open")
                                        , pl.max("MaxPrice").alias("high")
                                        , pl.min("MinPrice").alias("low")
                                        , pl.first("closing_price_1d").alias("close")
                                        , pl.sum("TradedVolume").alias("volume")
                                      ])
)

# order by ISIN and date to prepare for percent calculation in the next step
df_sort_by_isin_n_day = df_agg_by_isin_n_day.sort(["ISIN", "Date"])

# add col for prev day close as a basis for daily pct change computation
df_close_prev_day = df_sort_by_isin_n_day.with_columns(
    pl.col("close")
    .shift(1)
    .over(["ISIN"])
    .alias("close_previous_day")
)

# add col daily percent change
col_pct_change_1d = ( pl.col("close") - pl.col("close_previous_day") ) / pl.col("close") * 100
df_pct_change_1d = df_close_prev_day.with_columns(col_pct_change_1d.round(2).alias("pct_change_1d"))

# drop helper column for pct change calculation
df_dropped_cols_2 = df_pct_change_1d.drop("close_previous_day")

# drop previous day's rows for pct change calculation
df_remove_prev_day_rows = df_dropped_cols_2.filter(pl.col("Date") >= start_date)

Write To S3

In [7]:
# create empty in-memory file object
buffered_bytes_file_obj = io.BytesIO()

# write df to in-memory parquet file object
df_remove_prev_day_rows.write_parquet(file=buffered_bytes_file_obj)

# return to start of stream
buffered_bytes_file_obj.seek(0)

# generate Key as object name for file upload
if start_date.isoformat() == end_date.isoformat():
    key = "xetra_report_daily_" + start_date.isoformat() + ".parquet"
else:
    key = "xetra_report_multiple_days_" + start_date.isoformat() + "_to_" + end_date.isoformat() + ".parquet"

# upload to S3
s3_client.upload_fileobj(
    Fileobj=buffered_bytes_file_obj
    ,Bucket=bucket_tgt_data
    ,Key=key
)

Check Uploaded File

In [11]:
parquet_obj = s3_client.get_object(Bucket=bucket_tgt_data, Key=key).get("Body").read()
buffered_data = io.BytesIO(parquet_obj)
df = pl.read_parquet(buffered_data)
df

ISIN,SecurityDesc,Date,open,high,low,close,volume,pct_change_1d
str,str,date,f64,f64,f64,f64,i64,f64
"""AT000000STR1""","""STRABAG SE""",2022-01-28,38.05,38.05,37.0,38.05,456,1.18
"""AT000000STR1""","""STRABAG SE""",2022-01-31,37.8,37.8,37.65,37.8,1492,-0.66
"""AT00000FACC2""","""FACC AG INH.AK…",2022-01-28,7.66,7.66,7.52,7.66,610,0.26
"""AT0000606306""","""RAIFFEISEN BK …",2022-01-28,25.02,25.1,24.66,25.02,213,3.44
"""AT0000606306""","""RAIFFEISEN BK …",2022-01-31,25.34,25.34,24.8,25.34,1198,1.26
"""AT0000609607""","""PORR AG""",2022-01-28,12.56,12.56,12.56,12.56,0,1.27
"""AT0000609607""","""PORR AG""",2022-01-31,12.5,12.6,12.5,12.5,30,-0.48
"""AT0000644505""","""LENZING AG""",2022-01-28,108.2,108.8,108.2,108.2,125,-1.66
"""AT0000644505""","""LENZING AG""",2022-01-31,110.0,110.8,109.0,110.0,176,1.64
"""AT0000652011""","""ERSTE GROUP BN…",2022-01-28,41.6,41.6,41.3,41.6,568,-0.46
