In [108]:
#import libraries
import boto3
import pandas as pd
from io import StringIO
from datetime import datetime,timedelta
from pandas.io.parquet import to_parquet
from io import StringIO, BytesIO
import warnings
warnings.filterwarnings('ignore')

# Adapter layer

In [110]:
#all functions interact with external

# read files from s3 and transfer to dataframe format
#default decoding='utf-8'
#default seperator=','
def read_csv_to_df(bucket, key, decoding='utf-8',sep=','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

#write parquet file to s3
def write_df_to_s3(bucket, df, key):
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

#write csv file to s3
def write_df_to_s3_csv(bucket, df, key):
    out_buffer = StringIO()
    df.to_csv(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

#get a list of file
def list_files_in_prefix(bucket,prefix):
    #filter by date
    files=[obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files    

# Application layer

In [111]:
#extract, transform, load

#input bucket and objects, output a dataframe
def extract(bucket, date_list):
    #get objects
    objects=[key for date in date_list for key in list_files_in_prefix(bucket,date)]
    #read from s3
    df = pd.concat([read_csv_to_df(bucket, obj) for obj in objects], ignore_index=True)
    return df

def transform_report1(df,columns,arg_date):
        #select colums that we need
    df = df.loc[:, columns]
        #dealling with missing value, drop rows with missing value
    df.dropna(inplace=True)
    
    
        #Get opening price and closing price per ISIN and day
    #every day the earliest price is the opening price
    #transform('first'):for each ISIN, Date, get the first price
    df['opening_price']=df.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')

    #every day, the last price is the closing price
    df['closing_price']=df.sort_values(by=['Time']).groupby(['ISIN','Date'])['EndPrice'].transform('last')
    
    
        #aggregation
    df=df.groupby(['ISIN','Date'], as_index=False).agg(opening_price_eur=('opening_price','min'),closing_price_eur=('closing_price','min'),
                                                           minimum_price_eur=('MinPrice','min'),maximum_price_eur=('MaxPrice','max'),daily_traded_volume=('TradedVolume','sum'))
    #if as_index=True, then ISIN and Date would be index
    # for each ISIN and for each Date, we only have one entry
    
    
        #Percent change prev closing
    #shift(1) lag one row 
    df['prev_closing_price']=df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)
    #caculate percentage
    df['change_prev_closing_%']=((df['closing_price_eur']-df['prev_closing_price'])/df['prev_closing_price'])*100
    #drop unuseful column, and reset index
    df.drop(columns=['prev_closing_price'],inplace=True)

    #change decimal
    df=df.round(decimals=2)

    #sanity check, date shouldn't less than arg_date
    df=df[df.Date >= arg_date]
    return df

def load(bucket, df,trg_key,trg_format,meta_key,extract_date_list):
    #key of object that we upload to the s3
    key = trg_key + datetime.today().strftime("%Y%m%d_%H%M%S") + trg_format
    write_df_to_s3(bucket, df, key)
    #renew meta file
    update_meta_file(bucket,meta_key,extract_date_list)
    return True
  
#pipeline
def etl_report1(src_bucket,trg_bucket,date_list,columns,arg_date,trg_key,trg_format,meta_key):
    df=extract(src_bucket, date_list)
    df=transform_report1(df,columns,arg_date)
    extract_date_list=[date for date in date_list if date >=arg_date]
    load(trg_bucket, df,trg_key,trg_format,meta_key,extract_date_list) 
    return True


# Application Function - not core

In [112]:
#get date list,export severals days' raw data
def return_date_list(bucket, arg_date,src_format,meta_key):
    #date minus 1
    start = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    today=datetime.today().date()
    
    #if there's no meta file
    try:
            #meta file for job control: sometimes some job fails, we don't want to process all jobs.
        #first step: get all dates from min_date to today
        df_meta=read_csv_to_df(bucket,meta_key)

            #second step: get loaded dateds, some dates disappear maybe because of some reasons
        dates = [start + timedelta(days=x) for x in range(0, (today - start).days + 1)]
        src_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)
            #third step: remove dates that we've already processed, make date start from the minimum date
        dates_missing = set(dates[1:]) - src_dates

        #sometimes if arg_date=today_str, dates would be an empty list, there will be an error
        if dates_missing:
            min_date = min(set(dates[1:]) - src_dates) - timedelta(days=1)
            #[1:] we've already minus one day
            #[dates[1:]: datetime.date(2022, 9, 23), datetime.date(2022, 9, 24), datetime.date(2022, 9, 25)]
            #min_date=2022-09-23

                #final result
            return_dates = [date.strftime(src_format) for date in dates if date >= min_date]
            #return_dates=['2022-09-23', '2022-09-24', '2022-09-25']
            #it's okay, if it's duplicated
            return_min_date = (min_date+timedelta(days=1)).strftime(src_format)
        else:
            return_dates = []
            return_min_date = datetime(2200, 1, 1).date()
    except bucket.session.client('s3').execptions.NoSuchKey:
        return_date_list= [(min_date+timedelta(days=x)).strftime(src_format) for x in range(0,(today-min_date).days+1)]
        return_min_date = arg_date
    return return_min_date, return_dates

#update meta file
def update_meta_file(bucket,meta_key,extract_date_list):
    df_new=pd.DataFrame(columns=['source_date','datetime_of_processing'])
    df_new['source_date']=extract_date_list
    df_new['datetime_of_processing']= datetime.today().strftime('%Y-%m-%d')
    df_old=read_csv_to_df(bucket,meta_key)
    df_all=pd.concat([df_old,df_new])
    write_df_to_s3_csv(bucket,df_all, meta_key)
    

# Main Function Entrypoint

In [113]:
def main():
    #parameters/configurations
    # all data after arg_date
    arg_date = '2022-09-19'
    src_format = '%Y-%m-%d'
    #meta_file
    meta_key='meta_file.csv'
    #source bucket
    src_bucket = 'xetra-1234'
    #target bucket
    trg_bucket = 'jiaxin-etl'
    #colums that we need
    columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
    #target key
    trg_key='xetra_daily_report_'
    #save format
    trg_format='.parquet'
    
    #init connections
    s3 = boto3.resource('s3')
    bucket_src = s3.Bucket(src_bucket)
    bucket_trg = s3.Bucket(trg_bucket)
    
    #data pipeline, etl,run application
    extract_date,date_list=return_date_list(bucket_trg, arg_date,src_format,meta_key)
    etl_report1(bucket_src,bucket_trg,date_list,columns,arg_date,trg_key,trg_format,meta_key)
    return True

# Run

In [114]:
main()

True

### reading the upload file

In [115]:
trg_bucket = 'jiaxin-etl'
s3 = boto3.resource('s3')
bucket_trg = s3.Bucket(trg_bucket)

for obj in bucket_trg.objects.all():
    print(obj.key)

meta_file.csv
xetra_daily_report_20220924_033348.parquet
xetra_daily_report_20220924_041827.parquet
xetra_daily_report_20220924_181929.parquet
xetra_daily_report_20220924_182555.parquet
xetra_daily_report_20220924_185934.parquet
xetra_daily_report_20220924_190342.parquet


In [116]:
prq_obj = bucket_trg.Object(key='xetra_daily_report_20220924_182555.parquet').get().get('Body').read()
data = BytesIO(prq_obj)
df_report = pd.read_parquet(data)

df_report

Unnamed: 0,ISIN,Date,opening_price_eur,closing_price_eur,minimum_price_eur,maximum_price_eur,daily_traded_volume,change_prev_closing_%
0,AT000000STR1,2022-09-22,36.40,36.85,36.40,36.85,1475,-1.60
1,AT000000STR1,2022-09-23,37.00,37.95,37.00,37.95,396,2.99
2,AT000000STR1,2022-09-24,36.05,36.35,35.65,37.05,1838,-4.22
3,AT00000FACC2,2022-09-22,7.89,8.47,7.89,8.47,1956,1.80
4,AT00000FACC2,2022-09-23,8.27,8.36,8.27,8.46,70,-1.30
...,...,...,...,...,...,...,...,...
9611,XS2376095068,2022-09-23,33.59,34.09,33.59,34.13,400,3.17
9612,XS2376095068,2022-09-24,31.63,31.57,31.57,31.63,3,-7.40
9613,XS2434891219,2022-09-22,3.26,3.32,3.26,3.32,0,-3.33
9614,XS2434891219,2022-09-23,3.37,3.42,3.37,3.43,24646,3.11
