In [20]:
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta
import uuid
import os
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__

In [31]:
#Adapter Layer
#function converting CSV into DataFrame
def read_csv_to_dataframe(key_id, bucket_obj, decod='utf-8', dlm=','):
    csv_obj=bucket_obj.Object(key=key_id).get()['Body'].read().decode(decod)
    data=StringIO(csv_obj)
    return pd.read_csv(data,delimiter=dlm)

#writing to S3
#Convertign dataframe into parquet byte file and saving it on S3
def write_to_s3(bucket_obj,df_obj, key_id, file_type='parquet'):
    if file_type=='parquet':
        out_buffer=BytesIO()
        df_obj.to_parquet(out_buffer, index=False)
        bucket_obj.put_object(Body=out_buffer.getvalue(),Key=key_id)
    elif file_type=='csv':
        out_buffer=StringIO()
        df_obj.to_csv(out_buffer, index=False)
        bucket_obj.put_object(Body=out_buffer.getvalue(),Key=key_id)
    return True

def list_file_in_prefix(bucket_obj, prefix):
    return [obj.key for obj in bucket_obj.objects.filter(Prefix=prefix)] 

def return_date_list(bucket_obj,date_format, date_start_obj, meta_key):
    min_date=datetime.strptime(date_start_obj,date_format).date()-timedelta(days=1)
    today=datetime.today().date()
    try:
        df_meta=read_csv_to_dataframe(meta_key, bucket_obj)
        dates=[(min_date + timedelta(days=x)) for x in range(0,(today-min_date).days+1)]
        src_date=set(pd.to_datetime(df_meta['sourced_date']).dt.date)
        dist_list=set(dates[1:])-src_date
        if dist_list:
            min_date=min(set(dates[1:])-src_date)- timedelta(days=1)
            return_dates = [date.strftime(src_format) for date in dates if date>=min_date]
            return_min_date=(min_date+timedelta(days=1)).strftime(src_format)
        else:
            return_dates=[]
            return_min_date=datetime(2200,1,1).date()
    except :
        return_dates=[(min_date+timedelta(days=x)).strftime(date_format) for x in range(0,(today-min_date).days+1)]
        return_min_date=date_start_obj
    return return_min_date, return_dates
            
#returning list of all objects from AWS S3 bucket
def list_of_S3_obj(bucket_obj,date_format,date_start_obj):
    min_date=datetime.strptime(date_start_obj,date_format).date()-timedelta(days=1)
    return [obj for obj in bucket_obj.objects.all() if datetime.strptime(obj.key.split('/')[0],date_format).date()>=min_date]

#reading from parquet byte fules and creating pandas DataFrame
def from_prq_to_dataframe(key_id,bucket_id):
    prq_file=bucket_id.Object(key=key_id).get()['Body'].read()
    data = BytesIO(prq_file)
    return pd.read_parquet(data)

#saving to Azure Blob Storage
def write_to_blob(blob_service_client, container_id, blob_id, df_obj):
    out_buffer=BytesIO()
    df_obj.to_parquet(out_buffer, index=False)
    blob_client = blob_service_client.get_blob_client(container=container_id, blob=blob_id)
    blob_client.upload_blob(out_buffer.getvalue(),overwrite=True)

In [65]:
#Application Layer
def extract(bucket_obj, date_list):
    files = [key for date in date_list for key in list_file_in_prefix(bucket_obj,date)]
    return pd.concat([read_csv_to_dataframe(obj, bucket_obj) for obj in files], ignore_index=True)

def transform_report1(df_obj,start_date,column_list):
    #part of code to select only required columns
    df_obj=df_obj.loc[:,column_list]
    df_obj.dropna(inplace=True)
    df_obj['opening_price']=df_obj.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')
    df_obj['closing_price']=df_obj.sort_values(by=['Time']).groupby(['ISIN','Date'])['EndPrice'].transform('last')
    df_obj=df_obj.groupby(['ISIN','Date'], as_index=False).agg(opening_price_eur=('opening_price','min'), closing_price_eur=('closing_price','max'),
                                                           minimum_price_eur=('MinPrice','min'), maximum_price_eur=('MaxPrice','max'), daily_traded_volume=('TradedVolume','sum'))
    #percent change prev closing
    df_obj['prev_closing_price']=df_obj.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
    df_obj['change_vs_prior_day']=(df_obj['closing_price_eur']-df_obj['prev_closing_price'])/df_obj['prev_closing_price']*100
    df_obj.drop(columns=['prev_closing_price'],inplace=True)
    df_obj=df_obj.round(decimals=2)
    
    #filtering
    df_obj=df_obj[df_obj.Date>=start_date]
    return df_obj

def update_metafile(bucket_obj, meta_key, extract_date_list):
    df_new=pd.DataFrame(columns=['sourced_date','date_of_processing'])
    df_new['sourced_date']=extract_date_list
    df_new['date_of_processing']=datetime.today().strftime("%Y%m%d_%H%M%S")
    df_meta=read_csv_to_dataframe(meta_key, bucket_obj)
    df_all=pd.concat([df_new,df_meta], ignore_index=True)
    write_to_s3(bucket_obj,df_all,meta_key,'csv')
    return True;

def load(bucket_obj,df_obj, file_name, file_format):
    key = file_name + datetime.today().strftime("%Y%m%d_%H%M%S") + file_format
    write_to_s3(bucket_obj,df_obj,key)
    return True

def etl_report1(bucket,bucket_target, date_list, column_list,start_date, file_name,file_format):
    df=extract(bucket, date_list)
    df=transform_report1(df,start_date,column_list)
    load(bucket_target,df,file_name,file_format)
    return True

In [72]:
#main function entrypoint
def main():
    #Parameters/Configurations
    #Later from Config
    arg_date='2022-09-01'
    meta_key='meta_file.csv'
    src_format='%Y-%m-%d'
    source_bucket_name='xetra-1234'
    target_bucket_name='radek-xetra-1234'
    init_key_name='2022-01-07/2022-01-07_BINS_XETR22.csv'
    trg_key='xetra_daily_report_'
    trg_format='.parquet'
    #hardcoding list of columns
    Final_columns=['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
    
    #Init
    s3=boto3.resource('s3')
    bucket=s3.Bucket(source_bucket_name)
    bucket_target=s3.Bucket(target_bucket_name)
    
    #run job
    extract_date, date_list=return_date_list(bucket_target,src_format,arg_date,meta_key)
    etl_report1(bucket, bucket_target, date_list, Final_columns,extract_date, trg_key, trg_format)
    update_metafile(bucket_target, meta_key, list(set(date_list)-set(extract_date)))
                    
main()

In [54]:
#connection to Azure Storage Account
connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
container_name='radek-xetra-1234'
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client=blob_service_client.get_container_client(container_name)

In [26]:
df_report=pd.concat([from_prq_to_dataframe(obj.key, bucket_target) for obj in bucket_target.objects.all()], ignore_index=True)
df_report

In [73]:
arg_date='2022-09-06'
s3=boto3.resource('s3')
meta_key='meta_file.csv'
target_bucket_name='radek-xetra-1234'
bucket_target=s3.Bucket(target_bucket_name)
src_format='%Y-%m-%d'

#df_report=from_prq_to_dataframe('xetra_daily_report_20220913_040833.parquet', bucket_target)
df_meta=read_csv_to_dataframe(meta_key, bucket_target)
df_meta.sort_values(['sourced_date'])
#return_date_list(bucket_target,src_format,arg_date,meta_key)
#df_report

Unnamed: 0,sourced_date,date_of_processing
11,2022-08-31,20220913_044426
1,2022-09-01,20220913_044426
4,2022-09-02,20220913_044426
2,2022-09-03,20220913_044426
8,2022-09-04,20220913_044426
12,2022-09-05,20220913_044426
21,2022-09-06,2022-09-06 12:33:23
0,2022-09-06,20220913_044426
6,2022-09-07,20220913_044426
16,2022-09-07,20220913_044116


In [28]:
write_to_blob(blob_service_client, container_name, key, df_new_all)

In [30]:
#listing all blobs in container
blob_list = container_client.list_blobs()
for blob in blob_list:
    print(blob.name)

In [31]:
#opening a blob client and downloading a specific blob
blob_client= blob_service_client.get_container_client(container= container_name) 
prq_file=blob_client.download_blob(blob_name).readall()
data = BytesIO(prq_file)
df1 = pd.read_parquet(data)

In [27]:
list_1=['aa','bb','cc']
str='aa'

In [28]:
list_1.remove(str)

In [29]:
list_1

['bb', 'cc']