The below python script is used to load SS output csv files from GCS to ZDL

Here we first store all excel file into cloud storage bucket and we read this file from that bucket and store it into pandas dataframe 

Once data is loaded into pandas dataframe we then load data into output table 


In [0]:
import pandas as pd
from google.cloud import storage
import numpy as np
import pyspark.pandas as ps
from pyspark.sql.functions import expr

In [0]:
bucket_name = "its-managed-dbx-ds-01-d-user-work-area" #bucket name this remains constant
folder_name = "ss/ss_outputs" #pass the folder name from where we want to read the files - 

list_blobs is used to get all the list of files present in the folder_name

In [0]:
client = storage.Client()
BUCKET = client.get_bucket(bucket_name)
blobs = BUCKET.list_blobs(prefix=folder_name)
blobs_list = list(blobs)
# del blobs_list[0]
del blobs_list[0]

In [0]:
for b in blobs_list:
    print(b.name)

In [0]:
len(blobs_list)

Here we have two dataframe df(dataframe) and log_entry_df 

df will hold data of all the files

log_entry_df will hold data of each and every file that we read

In [0]:
df = pd.DataFrame()
log_entry_df = pd.DataFrame()
for b in blobs_list: #for loop is used to read all file paths 
    input_path = "gs://its-managed-dbx-ds-01-d-user-work-area/"+b.name
    input_df = pd.read_csv(input_path)  # Read CSV file

    year_value = input_path.split("/",6)[4]
    filename_value = ''.join(input_path.split("/")[-1].split(".csv")[0])

    # Assuming start_date_value is the first column in the CSV file
    start_date_value = input_df.iloc[0, 0]

    input_df.insert(loc=0, column='start_date', value=start_date_value)
    input_df.insert(loc=0, column='filename', value=filename_value)
    input_df.insert(loc=0, column='year', value=year_value)

    # Assuming no need to drop columns in CSV file
    # Assuming shape[0] gives the count of rows in the CSV file
    excel_info_df = pd.DataFrame(columns=['year', 'filename', 'count'], index=[0])
    excel_info_df['year'] = year_value
    excel_info_df['filename'] = filename_value
    excel_info_df['count'] = input_df.shape[0]

    df = pd.concat([df, input_df], axis=0, ignore_index=True) # Dataframe contains all the data
    log_entry_df = pd.concat([log_entry_df, excel_info_df], axis=0, ignore_index=True) # Log entry df contains details related to every file we read
    print("Successfully inserted "+input_path)
print("Successfully inserted all the data!!!")


In [0]:
input_path 

In [0]:
log_entry_df.head(2)

Unnamed: 0,year,filename,count
0,ss_outputs,safety_stock_determination_outputs_aava_07_02_...,102
1,ss_outputs,safety_stock_determination_outputs_askey_07_02...,2789


In [0]:
df.head(2)

Unnamed: 0.1,year,filename,start_date,Unnamed: 0,site_name,component,category,lead_time_std,lead_time_avg,raw_cost,average_demand,demand_std,calculated_safety_stock,adjusted_safety_stock,ss_weeks,ss_ll,ss_ul,ss_ll($),ss_ul($),ss_value($),extra_safety_stock,total_ss,total_ss ($),zscore_cal,sl_percentage,wt_sl_avg
0,ss_outputs,safety_stock_determination_outputs_aava_07_02_...,0,0,Aava,ACSF0801AB06-1,A,0.0,103.0,195.0,56,0,1,112,0,2,4,0,30000,21964.8,111,112,21840.0,0.0,50.0,86.62
1,ss_outputs,safety_stock_determination_outputs_aava_07_02_...,0,266,Aava,AM10000335-3,C,0.0,63.0,0.511,46,5,10,184,0,4,12,0,500,94.02,174,184,94.024,36.8,100.0,86.62


In [0]:
df.shape

Changing datatype of all columns to string so that we can handel unexpected values in later stage

In [0]:
for col in df.columns:
        df[col] = df[col].astype(str) #changing all cols to object type
        print(df[col].dtypes)

Renaming columns to follow standard column format

In [0]:
for col in df.columns:
    if isinstance(col, int): 
        old_name = str(col)
        new_name = new_name.lower()
        print(old_name+":"+new_name)
        consolidated_df.rename(columns = {col:new_name},inplace = True)
        continue
    old_name = str(col)
    new_name = old_name.replace(" ", "_").replace("(","").replace(")","").replace("#","num").replace("-","_to_").replace("/","_or_")
    new_name = new_name.lower()
    print(old_name+":"+new_name) 
    df.rename(columns = {old_name:new_name},inplace = True) #renaming columns by removing unexpected string

In [0]:
cols_in_schema = [
'year',

'filename',

'start_date',

'site_name', 

'component', 

'category', 

'lead_time_std', 

'lead_time_avg', 

'raw_cost', 

'average_demand', 

'demand_std', 

'calculated_safety_stock', 

'adjusted_safety_stock',  

'ss_weeks', 

'ss_ll', 

'ss_ul', 

'ss_ll$', 

'ss_ul$', 

'ss_value$', 

'extra_safety_stock', 

'total_ss', 

'total_ss_$', 

'zscore_cal', 

'sl_percentage', 

'wt_sl_avg']
df = df[cols_in_schema]

Converting pandas df to spark df and then append into stage table

In [0]:
spark_df = spark.createDataFrame(df) #converting df to spark df

In [0]:
spark_df.write.mode("append").format("delta").saveAsTable("pbi_tables_vw.ss_output_table")

Converting pandas df to spark df and then append into log table

In [0]:
log_entry_spark_df = spark.createDataFrame(log_entry_df)

In [0]:
log_entry_spark_df.write.mode("append").format("delta").saveAsTable("pbi_tables_vw.log_entry_ss_output") #appending data into log table