# Build Spark Database and Tables

## This code will read the CSV files in bronze, create the parquet files in Silver and then build the serverless spark db and tables based from the silver layer.


##### Nick Tinsley
##### Sr. Cloud Solution Architect


## Set Global Variables
### dbname is passed in by pipeline parameter, if running manually hard code the db name.

In [1]:
dbname = "ccc_poc"
storage_account = "dltinsynapse" 
container_name = "datalake"
dl_path_bronze ="datalake/elt/bronze"
dl_path_silver = "datalake/elt/silver"

In [6]:
%run VariableNotebook

In [7]:
varA + varB

In [2]:

spark.conf.set("myapp.dbname",dbname)
bronze_file_path = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/{dl_path_bronze}/{dbname}"
silver_file_path = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/{dl_path_silver}/{dbname}"

print(bronze_file_path)
print(silver_file_path)

In [3]:
from notebookutils import mssparkutils
# import pandas as pd

## Read CSV files from pipeline and build Parquet files for Lake House

In [4]:
#get list of file names
#https://.blob.core.windows.net/datalake
file_list = mssparkutils.fs.ls(bronze_file_path)


#loop through each csv file
for items in file_list:
    print(items.name)
    print(items.path)
    
    #load into temporary data frame
    tempdf = spark.read.load(items.path, format='csv', header=True)

    # pandatestpath = silver_file_path+f"/padatest/{items.name}"
    # print(f"creating {pandatestpath}")

    # pandaTest = tempdf.toPandas()
    # pandaTest.to_csv(pandatestpath, mode='w', index=False, header=True)
    # print(f"finished writing {pandatestpath}")

    ##testing out pandas stuff
    # #build output path string
    filename = items.name.strip(".csv")
    parquet_path = f"{silver_file_path}/{filename}"
    
    # #write to parquet files in output string
    print(f"Writing parquet file to: {parquet_path}")
    tempdf.write.mode("overwrite").parquet(parquet_path)
    print(f"Finished writing {parquet_path}")


## Create Spark DB

In [5]:
%%sql
DROP DATABASE IF EXISTS ${myapp.dbname} CASCADE

In [6]:
%%sql
-- create serverless database${myapp.dbname}
CREATE DATABASE ${myapp.dbname}
-- drop DATABASE mncrash

## Save Parquet Files to Spark Tables

In [7]:
silver_files = mssparkutils.fs.ls(silver_file_path)

for items in silver_files:
    print(items.name)
    # print(items.path + "//*.snappy.parquet")

    path = items.path + "/*.snappy.parquet"

    silverdf = spark.read.load(path, format="parquet")
    # silverdf.show(1)
    silverdf.write.format("delta").mode("overwrite").saveAsTable(f'{dbname}.{items.name}')
