# Loading data from sources to staging tables

## Load files to staging tables from Azure object/blob store
## If it already exists, then it skips
## It then stores into deleta table in parquet format 

In [1]:
%%pyspark

# Define the storage account and container
storage_account_name = "distributedanalytics"
container_name = "healthdata-fs"
directory_path = "data"

# Define the full path in which raw CSVs are placed
full_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{directory_path}"


fs  = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

# List files
path = sc._jvm.org.apache.hadoop.fs.Path(full_path)
files = fs.listStatus(path)
# Display the files
for file in files:
    file_name = file.getPath().getName()
    if file_name.endswith(".csv"):
        # Read the CSV file into a DataFrame
        print(f"the file is {full_path}/{file_name}")
        df = spark.read.option("header", "true").csv(f"{full_path}/{file_name}")


        # Define the Delta Lake table path
        delta_table_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/delta/{file_name.replace('.csv', '')}"
         # Check if the Delta table already exists
        if not fs.exists(sc._jvm.org.apache.hadoop.fs.Path(delta_table_path)):
            # Read the CSV file into a DataFrame
            df = spark.read.option("header", "true").csv(f"{full_path}/{file_name}")
            df.write.format("delta").mode("overwrite").option("delta.columnMapping.mode", "name").save(delta_table_path)
            delta_df = spark.read.format("delta").load(delta_table_path)


            table_name = file_name.replace('.csv', '')
            # Write data to Lake database table
            delta_df.write.format("parquet").saveAsTable(f"default.{table_name}")


            print(f"Delta Lake table created for {file_name} at {delta_table_path}")
        else:
            print(f"Delta Lake table already exists for {file_name}, skipping...")
            


StatementMeta(ghparkcluster, 34, 2, Finished, Available, Finished)

the file is abfss://healthdata-fs@distributedanalytics.dfs.core.windows.net/data/CHEAspercentGDP.csv
Delta Lake table already exists for CHEAspercentGDP.csv, skipping...
the file is abfss://healthdata-fs@distributedanalytics.dfs.core.windows.net/data/CHEInMillionConstant2022USD.csv
Delta Lake table already exists for CHEInMillionConstant2022USD.csv, skipping...
the file is abfss://healthdata-fs@distributedanalytics.dfs.core.windows.net/data/CHEPerCapitaUSD.csv
Delta Lake table already exists for CHEPerCapitaUSD.csv, skipping...
the file is abfss://healthdata-fs@distributedanalytics.dfs.core.windows.net/data/DoctorsUK.csv
Delta Lake table already exists for DoctorsUK.csv, skipping...
the file is abfss://healthdata-fs@distributedanalytics.dfs.core.windows.net/data/GCEInmillionconstant2022USD.csv
Delta Lake table already exists for GCEInmillionconstant2022USD.csv, skipping...
the file is abfss://healthdata-fs@distributedanalytics.dfs.core.windows.net/data/LifeExpectancyBirthAnd60.csv
Delt

# Listing of all tables

In [2]:
from delta.tables import DeltaTable

all_tables = spark.sql("SHOW TABLES").collect()
delta_tables = []

for table in all_tables:
    table_full_name = f"{table.tableName}"
    print(table_full_name)
    if DeltaTable.isDeltaTable(spark, table_full_name):
        delta_tables.append(table_full_name)

for delta_table in delta_tables:
    print(delta_table)



StatementMeta(ghparkcluster, 34, 3, Finished, Available, Finished)

chepercapitausd
doctorsuk
lifeexpectancybirthand60
nhaindicators
phcexpendituretrends
worldhealthdata
cheaspercentgdp
cheinmillionconstant2022usd
gceinmillionconstant2022usd
monthlydiagnostics2024_13clr
ukgdppriceindex2022is100
gdppercapitaworldbank
uknetmigration
ukpopulationbyageons
lifeexpetancyatbirth
ons_nhs_expenditure
