In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
from datetime import datetime, timedelta
import pyodbc

### Parameters - Please update the variable values accordingly! ###
# The nmi_info file contains the interested nmi details
s_nmi_info = "C:/Users/shuk_/Notebook/Shell/Data/nmi_info.csv" 
s_data_source = "C:/Users/shuk_/Notebook/Shell/Data/ConsumptionData/"
s_data_sink = "C:/Users/shuk_/Notebook/Shell/Data/Output/"
s_nmi_sink_fn = "nmi_all.csv"

s_server = "ASUSR7"
s_database = "shell"

# function to read the specified csv file and return as dataframe
def read_csv_to_df(spark, sfile):
    try:
        df1 = spark.read.format("csv").option("header","true").load(sfile)
        return df1
    except Exception as error:
        return (error)
  
# Create SparkSession
spark = SparkSession.builder.appName('csv_nmi_load').getOrCreate()

# Read nmi_info csv file into dataframe, convert the column Interval from string to integer
df_nmi = read_csv_to_df(spark,s_nmi_info) 
df_nmi = df_nmi.withColumn("Interval",col("Interval").cast("Integer"))
df_nmi.printSchema()
df_nmi.show()  


In [None]:
# get current datetime, in AEST (UTC + 10)
tstoday = datetime.now() + timedelta(hours = 10)
lst_error = []
df_nmi_all = None 
7
# Read all the csv files in the specified folder - variable s_data_path
nmi_collect = df_nmi.collect()
 
# looping thorough each row of the df_nmi dataframe
for row in nmi_collect:
    # while looping through each row, get the nmi name and read the related nmi file
    s_file = s_data_source + row["Nmi"] + ".csv"
    
    df_nmi_1 = read_csv_to_df(spark, s_file) 
    if isinstance(df_nmi_1, DataFrame): 
        df_nmi_1 = df_nmi_1.withColumn("AESTTime", to_timestamp("AESTTime"))
        df_nmi_1 = df_nmi_1.withColumn("Quantity",col("Quantity").cast("Double"))
        df_nmi_1 = df_nmi_1.withColumn("Nmi", lit(row["Nmi"]))
        df_nmi_1 = df_nmi_1.withColumn("Load_Timestamp", lit(tstoday))
        
        if df_nmi_all == None:
            df_nmi_all = df_nmi_1
        else:
            df_nmi_all = df_nmi_1.union(df_nmi_all)
    else:        
        # Output the error and the problem file name into a list (ideally into an error table)
        lst_error.append([df_nmi_1, tstoday.strftime('%Y-%m-%d %H:%M:%S')])

df_nmi_all.printSchema()
df_nmi_all.show(20)
print(lst_error)

In [None]:
# write dataframes to sink - SQL server
cnxn = pyodbc.connect(driver='{SQL Server}', server = s_server, database = s_database, trusted_connection='yes')
cursor = cnxn.cursor()

# nmi_info 
for row in nmi_collect:
    try:
        cursor.execute("INSERT INTO dbo.stg_nmi_info (Nmi,State,Interval,Load_Timestamp) values(?,?,?,?)", \
                       row.Nmi, row.State, row.Interval, tstoday.strftime('%Y-%m-%d %H:%M:%S'))
    except Exception as error:
        cursor.execute("INSERT INTO dbo.stg_nmi_info_error (Nmi,State,Interval,Load_Timestamp, Error) values(?,?,?,?,?)", \
                       row.Nmi, row.State, row.Interval, tstoday.strftime('%Y-%m-%d %H:%M:%S'), str(error))
    
# all other nmi source files
nmi_all_collect = df_nmi_all.collect()
for row in nmi_all_collect:
    try:
        cursor.execute("INSERT INTO dbo.stg_nmi_all (AESTTime,Quantity,Unit,Nmi,Load_Timestamp) values(?,?,?,?,?)", \
                       row.AESTTime.strftime('%Y-%m-%d %H:%M:%S'), \
                       row.Quantity, row.Unit, row.Nmi, row.Load_Timestamp.strftime('%Y-%m-%d %H:%M:%S'))
    except Exception as error:
        cursor.execute("INSERT INTO dbo.stg_nmi_all_error (AESTTime,Quantity,Unit,Nmi,Load_Timestamp, Error) values(?,?,?,?,?,?)", \
                       row.AESTTime, row.Quantity, row.Unit, row.Nmi, row.Load_Timestamp, str(error))
    
cnxn.commit()
cursor.close()

print("End of extraction")