In [0]:
dbutils.secrets.listScopes()

In [0]:
service_credential = dbutils.secrets.get(scope="Newzealand-scopes01", key="servicecredential")
application_id = dbutils.secrets.get(scope="Newzealand-scopes01",key="applicationid")
directory_id = dbutils.secrets.get(scope="Newzealand-scopes01", key="directoryid01")
storage_account = dbutils.secrets.get(scope="Newzealand-scopes01", key="storageaccount")

##storage_account = "newzealandstorageaccount"


# Set Spark configs
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{directory_id}/oauth2/token")




In [0]:
location_df = spark.read.format("csv") \
    .option("header", "true") \
        .option('inferschema',True) \
            .load(f"abfss://bronze@{storage_account}.dfs.core.windows.net/incoming/locations.csv")

location_df.show()

In [0]:
location_df.printSchema()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

location_df = location_df.withColumn("density",col("density").cast(DecimalType(10,3))) \
        .withColumn("location_id",col("location_id").cast("int"))

location_df = location_df.withColumnRenamed("location_id","location id")


location_df.show()

In [0]:
spark.catalog.listTables()

In [0]:

make_details_df  = spark.read.format("csv").option("header",True).option('inferschema',True).load(f"abfss://bronze@{storage_account}.dfs.core.windows.net/incoming/make_details.csv")
display(make_details_df)

In [0]:
stolen_vehicles_df = spark.read.format("csv").option("header",True).option('inferschema',True).load(f"abfss://bronze@{storage_account}.dfs.core.windows.net/incoming/stolen_vehicles.csv")

stolen_vehicles_df.show()

In [0]:
stolen_vehicles_db_data_dictionary_df = spark.read.format("csv").option("header",True).option('inferschema',True).load(f"abfss://bronze@{storage_account}.dfs.core.windows.net/incoming/stolen_vehicles_db_data_dictionary.csv")

stolen_vehicles_db_data_dictionary_df.display()


In [0]:
listy = [location_df,stolen_vehicles_db_data_dictionary_df,stolen_vehicles_df,make_details_df]

In [0]:
location_df.show(1)

In [0]:
stolen_vehicles_db_data_dictionary_df.show()

In [0]:
location_df.printSchema()

In [0]:
listy = [location_df,stolen_vehicles_db_data_dictionary_df,stolen_vehicles_df,make_details_df]


In [0]:
for i, df in enumerate(listy):
    for col in df.columns:
        new_column_name = col.replace(" ", "_")
        df = df.withColumnRenamed(col, new_column_name)
    listy[i] = df  # Update the list with the modified DataFrame
    df.show()

In [0]:
location_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/location")

In [0]:
stolen_vehicles_db_data_dictionary_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/stolen_vehicles_db_data_dictionary")

In [0]:
stolen_vehicles_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/stolen_vehicles")


In [0]:
make_details_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/make_details")

In [0]:
stolen_vehicles_df.printSchema()

In [0]:
from pyspark.sql.functions import sum,when,col


null_count_location_df = location_df.select([
    sum(
        when(
            col(column).isNull(),
            1
            ).otherwise(0)
        ).alias(column)
    for column in location_df.columns
    ])

null_count_location_df.show()

In [0]:
location_df.filter(col("population").isNotNull()).show()

In [0]:
from pyspark.sql.functions import sum,when,col


null_count_stolen_vehicles_df = stolen_vehicles_df.select([
    sum(
        when(
            col(column).isNull(),
            1
            ).otherwise(0)
        ).alias(column)
    for column in stolen_vehicles_df.columns
    ])

null_count_stolen_vehicles_df.show()



In [0]:
null_stolen_vehicles_dictionary = stolen_vehicles_db_data_dictionary_df.select([
    sum(
        when(
            col(column).isNull(),
            1
            ).otherwise(0)
        ).alias(column)
     for column in stolen_vehicles_db_data_dictionary_df.columns
     ])

null_stolen_vehicles_dictionary.show()

In [0]:
null_count_make_details = make_details_df.select([sum(when(col(column).isNull(),1).otherwise(0)).alias(column) for column in make_details_df.columns])

null_count_make_details.show()

In [0]:
null_count_stolen_vehicles_df.show()
null_count_location_df.show()
null_count_make_details.show()
null_stolen_vehicles_dictionary.show()


In [0]:
stolen_vehicles_df.printSchema()

In [0]:
stolen_vehicles_df = stolen_vehicles_df.fillna({"vehicle_type":"unknown","make_id":"0","model_year":"0","vehicle_desc":"unknown","color":"unknown","date_stolen":"0000-00-00","location_id":"0"})

In [0]:
location_df.printSchema()

In [0]:
location_df = location_df.dropDuplicates()
stolen_vehicles_db_data_dictionary_df = stolen_vehicles_db_data_dictionary_df.dropDuplicates()
stolen_vehicles_df= stolen_vehicles_df.dropDuplicates()
make_details_df= make_details_df.dropDuplicates()


In [0]:
location_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/location") 
stolen_vehicles_db_data_dictionary_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/stolen_vehicles_db_data_dictionary") 
stolen_vehicles_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/stolen_vehicles") 
make_details_df.write.mode("append").parquet(f"abfss://silver@{storage_account}.dfs.core.windows.net/make_details") 


In [0]:
location_df.printSchema()

In [0]:
location_df.createOrReplaceTempView("Location01")
stolen_vehicles_db_data_dictionary_df.createOrReplaceTempView("StolenVehiclesDBDataDictionary")
stolen_vehicles_df.createOrReplaceTempView("StolenVehicles")
make_details_df.createOrReplaceTempView("MakeDetails")


In [0]:
spark.sql("""select * from StolenVehiclesDBDataDictionary""")

In [0]:
%sql
select vehicle_type, model_year,count(*) as num_of_most_stolen_model from StolenVehicles
group by 1,2
order by 3 desc;

In [0]:
%sql
select vehicle_desc,model_year,count(*) as no_of_most_stolen_brand
 from StolenVehicles
 group by 1,2
 order by 3 desc;

