## SOURCE DATA FOR ONE GIVEN DATE

In [103]:
df_data=extract(SOURCE_FILES_PATH,previous_date,date_to_process,src_date_format)
df_data
#data from deutche borse a german marketplace organizer for trading shares. dataset contains the shares trading
#minute by minute 

Unnamed: 0,ISIN,Mnemonic,SecurityDesc,SecurityType,Currency,SecurityID,Date,Time,StartPrice,MaxPrice,MinPrice,EndPrice,TradedVolume,NumberOfTrades
0,AT0000A0E9W5,SANT,S+T AG O.N.,Common stock,EUR,2504159,2022-01-03,08:00,14.760,14.760,14.750,14.750,4414,2
1,DE000A0DJ6J9,S92,SMA SOLAR TECHNOL.AG,Common stock,EUR,2504287,2022-01-03,08:00,37.640,37.660,37.600,37.660,1649,3
2,DE000A0D6554,NDX1,NORDEX SE O.N.,Common stock,EUR,2504290,2022-01-03,08:00,13.990,14.030,13.940,13.960,23011,36
3,DE000A0D9PT0,MTX,MTU AERO ENGINES NA O.N.,Common stock,EUR,2504297,2022-01-03,08:00,180.000,180.050,179.500,179.500,2308,22
4,DE000A0HN5C6,DWNI,DEUTSCHE WOHNEN SE INH,Common stock,EUR,2504314,2022-01-03,08:00,37.280,37.280,37.280,37.280,2897,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
239075,DE0007164600,SAP,SAP SE O.N.,Common stock,EUR,2505077,2022-01-04,16:44,124.380,124.380,124.380,124.380,4000,3
239076,MT0000580101,M8G,MEDIA AND GAMES INV. EO 1,Common stock,EUR,2509636,2022-01-04,16:44,4.118,4.118,4.118,4.118,24,2
239077,DE000A0Z23Q5,ADN1,ADESSO SE INH O.N.,Common stock,EUR,2504440,2022-01-04,16:45,196.800,196.800,196.800,196.800,7,2
239078,DE000A3E5ES0,BIO0,BIOTEST AG ST O.N.Z.VERK.,Common stock,EUR,6973997,2022-01-04,20:30,43.000,43.000,43.000,43.000,0,1


## TRANSFORMED DATA

In [107]:
df_data_tr=transform(df_data,columns,date_to_process,src_date_format)
df_data_tr

Unnamed: 0,ISIN,Date,opening_price_euro,closing_price_euro,minimum_price_euro,maximum_price_euro,daily_traded_volume,change_prev_closing_%
1,AT000000STR1,2022-01-04,37.75,37.85,37.75,37.85,27,1.20
3,AT00000FACC2,2022-01-04,7.86,7.79,7.79,7.95,681,1.83
5,AT0000606306,2022-01-04,26.00,26.80,26.00,26.80,339,3.63
7,AT0000609607,2022-01-04,13.78,13.72,13.72,13.78,400,0.59
9,AT0000644505,2022-01-04,123.20,123.20,123.20,123.20,13,3.53
...,...,...,...,...,...,...,...,...
6344,XS2265370234,2022-01-04,16.22,16.52,15.97,16.62,426,2.70
6346,XS2284324667,2022-01-04,32.66,32.87,32.66,32.87,0,1.78
6348,XS2314659447,2022-01-04,8.49,8.55,8.46,8.55,0,-1.31
6350,XS2314660700,2022-01-04,18.30,18.61,18.30,18.61,0,4.14


## ETL PIPELINE STARTS HERE

In [1]:
import os
import pandas as pd
from datetime import datetime,timedelta

In [2]:
def return_data_df(files_for_analysis,source_files_path):
    df_list=[pd.read_csv(source_files_path+'/'+file) for file in files_for_analysis]
    df_data=pd.concat(df_list,ignore_index=True)
    return df_data

def return_required_dates(meta_file_path,src_date_format,initial_date,today):
    df_meta_file=pd.read_csv(meta_file_path)
    processed_dates=set(map(lambda date: datetime.strptime(date,src_date_format).date() ,df_meta_file.source_file_date))
    initial_date_dt=datetime.strptime(initial_date,src_date_format).date()
    datelist_till_today=set([initial_date_dt+timedelta(days=x) for x in range(0,(today-initial_date_dt).days +1)])
    date_to_process=min(datelist_till_today - processed_dates)
    previous_date=date_to_process-timedelta(days=1)
    if date_to_process.weekday()==5:
        dates_with_no_data=[date_to_process,date_to_process+timedelta(days=1)]
        update_meta_file(meta_file_path,dates_with_no_data,[datetime.now().strftime('%Y-%m-%d, %H:%M:%S')]*2)
        date_to_process+=timedelta(days=2)
    return(date_to_process,previous_date)

def update_meta_file(meta_file_path,date_to_process,date_of_processing):
    df_meta=pd.read_csv(meta_file_path)
    df_new_row=pd.DataFrame.from_dict({'source_file_date':date_to_process,'date_of_processing':date_of_processing})
    df_meta_new=pd.concat([df_meta,df_new_row])
    df_meta_new.to_csv(meta_file_path,index=False)

In [3]:
def extract(source_files_path,previous_date,date_to_process,src_date_format):
    directory_files=os.listdir(source_files_path)
    files_for_analysis=[file for file in directory_files if file.startswith(previous_date.strftime(src_date_format))]+ [file for file in directory_files if file.startswith(date_to_process.strftime(src_date_format))]
    df_all=return_data_df(files_for_analysis,source_files_path)
    return(df_all)

In [4]:
def transform(df,columns,date_to_process,src_date_format):
    df=df.loc[:,columns]
    df.dropna(inplace=True)
    df['opening_price']=df.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')
    df['closing_price']=df.sort_values(by=['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('last')
    df=df.groupby(['ISIN','Date'],as_index=False).agg(opening_price_euro=('opening_price','min'),closing_price_euro=('closing_price','min'),minimum_price_euro=('MinPrice', 'min'), maximum_price_euro=('MaxPrice', 'max'), daily_traded_volume=('TradedVolume', 'sum'))
    df['prev_closing_price'] = df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_euro'].shift(1)
    df['change_prev_closing_%'] = (df['closing_price_euro'] - df['prev_closing_price']) / df['prev_closing_price'] * 100
    df.drop(columns=['prev_closing_price'], inplace=True)
    df = df.round(decimals=2)
    df=df[df.Date==date_to_process.strftime(src_date_format)]
    return df   

In [5]:
def load_report(df,load_file_path,report_file_date):
    file_path=load_file_path+report_file_date+'.csv'
    df.to_csv(file_path,index=False)

In [6]:
def main():
    SOURCE_FILES_PATH='source_files'
    META_FILE_PATH='meta_file.csv'
    LOAD_FILE_PATH='load_reports/'
    INITIAL_DATE='2022-01-03'
    date_of_processing_meta=datetime.now().strftime('%Y-%m-%d, %H:%M:%S')
    src_date_format='%Y-%m-%d'
    today=datetime.now().date()
    columns=['ISIN','Date','Time','StartPrice','MaxPrice','MinPrice','EndPrice','TradedVolume']
    date_to_process,previous_date=return_required_dates(META_FILE_PATH,src_date_format,INITIAL_DATE,today)
    df_data=extract(SOURCE_FILES_PATH,previous_date,date_to_process,src_date_format)
    df_transformed=transform(df_data,columns,date_to_process,src_date_format)
    report_file_date=date_to_process.strftime(src_date_format)
    load_report(df_transformed,LOAD_FILE_PATH,report_file_date)
    update_meta_file(META_FILE_PATH,[date_to_process],[date_of_processing_meta])

In [7]:
main()