In [None]:
# function to write data from all processed files to Azure SQL

def write_to_azure_sql(df, table, mode):
    username="database_username"
    password = dbutils.secrets.get(scope = "scope_name", key = "scope_password")
    jdbcHostname = "database_server"
    jdbcPort = 1433
    jdbcDatabase = "database_name"
    jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

    properties = {
        "user" : username,
        "password" : password,
        "driver" : jdbcDriver
    }

    url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname,jdbcPort,jdbcDatabase)

    df.write.jdbc(url=url, table=table, mode=mode, properties = properties)

In [None]:
# reading csv files, casting strings to integers and writing to azure SQL

df = spark.read.format("csv")\
.options(header='true', delimeter=',')\
.load("/mnt/landing/SourceA*.csv")

# check schema
df.printSchema()

df_csv = df.select(
    "Employee_id",
    "First_Name",
    "Last_Name",
    "Gender",
    "Salary",
    "Date_of_Birth",
    "Age",
    "Country",
    "Department_id",
    "Date_of_Joining",
    "Manager_id",
    "Currency",
    "End_Date"
)
# create a temporary view "emp"
df_csv.createOrReplaceTempView("Emp")

# casting certain columns
df_cast = spark.sql(
    "select cast(Employee_id as int), cast(Age as int), First_Name, Last_Name, Gender, cast(Salary as int), Date_Of_Birth, Country, cast(Department_id as int), Date_of_Joining, cast(Manager_id as int), Currency, End_Date from Emp"
)

#alternative form of casting
from pyspark.sql.types import IntegerType

ds = df_csv.withColumn(
    "Employee_id",
    df_csv.Employee_id.cast(IntegerType())
).withColumn(
    "Salary",
    df_csv.Salary.cast(IntegerType())
)

# confirm data casting
# print(ds.printSchema())
# display(df_cast)

# write data to delta table
df_cast.write.mode("overwrite")\
.format("delta")\
.saveAsTable("dvdb.employee")


# write data to Azure SQL
write_to_azure_sql(df_cast, table="Employee", mode="append")

In [None]:
# reading json file, casting values to integers and writing to azure SQL
# read json file
df = spark.read.format("json")\
.option('header', 'true')\
.load("/mnt/landing/SourceB*.json")

# check schema
df.printSchema()

df_json = df.select(
    "Employee_id",
    "Customer.First_Name",
    "Customer.Last_Name",
    "Gender",
    "Salary",
    "Date_of_Birth",
    "Age",
    "Country",
    "Department_id",
    "Date_of_Joining",
    "Manager_id",
    "Currency",
    "End_Date"
)

# create a temporary view "EmpJson"
df_json.createOrReplaceTempView("EmpJson")

# casting certain columns
df_json_cast = spark.sql(
    "select cast(Employee_id as int), cast(Age as int), First_Name, Last_Name, Gender, cast(Salary as int), Date_Of_Birth, Country, cast(Department_id as int), Date_of_Joining, cast(Manager_id as int), Currency, End_Date from EmpJson"
)

# confirm data casting
# display(df_cast)

df_json_cast.write.mode("append")\
.format("delta")\
.saveAsTable("dvdb.employee")

# write data to Azure SQL
write_to_azure_sql(df_json_cast, table="Employee", mode="append")

In [None]:
# reading xml file, casting strings to integers and writing to azure SQL

df = spark.read.format("com.databricks.spark.xml")\
.option("rootTag", "dataset")\
.option("rowTag", "record")\
.load("/mnt/landing/SourceC*.xml")

# read json file
df_xml = df.select(
    "Employee_id",
    "Customer.First_Name",
    "Customer.Last_Name",
    "Gender",
    "Salary",
    "Date_of_Birth",
    "Age",
    "Country",
    "Department_id",
    "Date_of_Joining",
    "Manager_id",
    "Currency",
    "End_Date"
)
df_xml.show()

# create a temporary view "emp"
df_xml.createOrReplaceTempView("Emp")

# casting certain columns
df_xml_cast = spark.sql(
    "select cast(Employee_id as int), cast(Age as int), First_Name, Last_Name, Gender, cast(Salary as int), Date_Of_Birth, Country, cast(Department_id as int), Date_of_Joining, cast(Manager_id as int), Currency, End_Date from Emp"
)

df_xml_cast.write.mode("append")\
.format("delta")\
.saveAsTable("dvdb.employee")

write_to_azure_sql(df_xml_cast, table="Employee", mode="append")


In [None]:
%sql
-- confirm total number of rows in delta table

select count(*) from dvdb.employee

In [None]:
# processing Department csv file for visualization
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

custom_schema = StructType(
    [
        StructField("Department_id", IntegerType(),True),
        StructField("Name", StringType(),True),
    ]
)  
df = spark.read.format("csv")\
.options(header='true', delimeter=',')\
.schema(custom_schema)\
.load("/mnt/landing/Department.csv")


# write dataframe to delta table
df.write.mode("append")\
.format("delta")\
.saveAsTable("dvdb.department")