# Nobel Laurates: Data Discoveries

> Including Key Vault for hiding the Credentials

In [0]:
SecretId = dbutils.secrets.get(scope = "NobelKey", key = "secretId")
ClientId = dbutils.secrets.get(scope = "NobelKey", key = "clientId")
TenantId = dbutils.secrets.get(scope = "NobelKey", key = "tenantId")

> Mounting the DataLake Gen2 Storage in Databricks
- Bronze Container for reading the Raw Data
- Silver Container for storing the transformed Data and retriving for further Analysis
- Gold Container for storing the busssines specific data models

In [0]:
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": ClientId,
  "fs.azure.account.oauth2.client.secret": SecretId,
  "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{TenantId}/oauth2/token"
}


In [0]:
dbutils.fs.mount(
  source = "abfss://bronze@nobaldatalake.dfs.core.windows.net/",
  mount_point = "/mnt/bronze",
  extra_configs = configs
)

In [0]:
dbutils.fs.mount(
  source = "abfss://silver@nobaldatalake.dfs.core.windows.net/",
  mount_point = "/mnt/silver",
  extra_configs = configs
)

In [0]:
dbutils.fs.mount(
  source = "abfss://gold@nobaldatalake.dfs.core.windows.net/",
  mount_point = "/mnt/gold",
  extra_configs = configs
)

> list the file system for mounted data

In [0]:
display(dbutils.fs.ls("/mnt/bronze"))


In [0]:
#dbutils.fs.unmount("/mnt/Nobel")


# Bronze

> Read the Raw Data from Bronze container in Data lake

In [0]:
# Read the raw CSV file from mounted path
df_bronze = spark.read.option("header", True).csv("/mnt/bronze/Nobel_Price.csv")



To show the Dataset

In [0]:
display(df_bronze)

> Starting the Transformation

Change the Column name as some column name as spaces

In [0]:
df_bronze_clean = df_bronze.toDF(*[col.replace(" ", "_").replace(".", "_") for col in df_bronze.columns])

df_bronze_clean.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .save("/mnt/bronze/Nobel_Price")


Check it store or NOT

In [0]:
display(dbutils.fs.ls("/mnt/bronze"))

# Silver

> Reading the Data from bronze again for Further transformation

In [0]:
# Load data from the Delta table (Bronze layer) into a DataFrame
df_bronze = spark.read.format("delta").load("dbfs:/mnt/bronze/Nobel_Price")

# View the data
display(df_bronze)


To know the actual schema 

In [0]:
# Show basic info
df_bronze.printSchema()

To get the Quick Review of Data.

In [0]:
# Describe data
df_bronze.describe().show()

> Check the Null Values

In [0]:
# Check for null values
from pyspark.sql.functions import col, sum
df_bronze.select([col(c).isNull().cast("int").alias(c) for c in df_bronze.columns]).agg(*[sum(col(c)) for c in df_bronze.columns]).show()

- Drop the overall_motivation column as it has 990 null values

In [0]:
df_bronze = df_bronze.drop("Overall_motivation")
  # Drop column with too many nulls


- To handle nulls in Surname, we create Full Name column

In [0]:
from pyspark.sql.functions import concat_ws

df_bronze = df_bronze.withColumn("Full_Name", concat_ws(" ", df_bronze["Firstname"], df_bronze["Surname"])) 
#Concating the two Columns as lasrname as some null Values


- Filling the nulls in specific columns

In [0]:
from pyspark.sql.functions import col, when

df_bronze = df_bronze.withColumn(
    "Organization_city",
    when(col("Organization_city").isNull(), col("Born_city")).otherwise(col("Organization_city"))
).withColumn(
    "Organization_country",
    when(col("Organization_country").isNull(), col("Born_country")).otherwise(col("Organization_country"))
)


In [0]:
# fill missing values after filling the relevant values for "Organization name", "Born country", "Born city",etc with "Not Updated"
df_bronze = df_bronze.fillna({
    "Organization_name": "Not Updated",
    "Born_country": "Not Updated",
    "Born_country_code": "Not Updated",
    "Born_city": "Not Updated",
    "Organization_city": "Not Updated",
    "Organization_country": "Not Updated",
    "Died_country": "No Death Recorded", 
    "Died_country_code": "No Death Recorded", 
    "Died_city": "No Death Recorded"
    })


- Drop the Columns that has null values and columns that not required

In [0]:
df_bronze = df_bronze.na.drop(subset=["Born"])
#Drop rows where "Born" is null (if less than 4% of data)

In [0]:
df_bronze = df_bronze.drop("Firstname", "Surname") # no need we have full name now


- Do the Date Formating

In [0]:
#Convert "Born" and "Died" columns to DateType

from pyspark.sql.functions import to_date

df_bronze = df_bronze.withColumn("Died", to_date("Died", "yyyy-MM-dd"))
df_bronze = df_bronze.withColumn("Born", to_date("Born", "yyyy-MM-dd"))


> Again check the Null Values

In [0]:
# Check for null values 
from pyspark.sql.functions import col, sum
df_bronze.select([col(c).isNull().cast("int").alias(c) for c in df_bronze.columns]).agg(*[sum(col(c)) for c in df_bronze.columns]).show()

> Check for Duplicates

In [0]:
# Check for duplicate rows
df_bronze_duplicates = df_bronze.distinct()

df_bronze_duplicates.count()

In [0]:
df_bronze.count()   # it means it doesnt have any duplicaes values.

- Check the Column names

In [0]:
print(df_bronze.columns)


Convert the Datatype where Required

In [0]:
# to convert the datatype to int where Id and Year are int

from pyspark.sql.functions import col

df_bronze = df_bronze.withColumn("Id", col("Id").cast("int")) \
       .withColumn("Year", col("Year").cast("int"))


In [0]:
df_bronze.printSchema()


> Create the 3Nf Data model 

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

In [0]:
# we are creating a new person table with the columns we need

df_person = df_bronze.select(
    "Id", "Full_Name", "Born", "Died", "Gender",
    "Born_country", "Born_country_code", "Born_city",
    "Died_country", "Died_country_code", "Died_city"
)

In [0]:
# Select only unique organization entries
df_org = df_bronze.select(
    "Organization_name", "Organization_city", "Organization_country"
).dropDuplicates()

# Generate a unique Organization ID
df_org = df_org.withColumn("Organization_id", monotonically_increasing_id())

In [0]:
# Join with df_org to get Organization_id
df_awards = df_bronze.join(
    df_org,
    on=["Organization_name", "Organization_city", "Organization_country"],
    how="left"
).select(
    "Id", "Year", "Category", "Motivation", "Organization_id"
)

- Save in to the Silver layer

In [0]:
# Save the table in silver

df_person.write.format("delta").mode("overwrite").save("/mnt/silver/delta/person")

df_org.write.format("delta").mode("overwrite").save("/mnt/silver/delta/organization")

df_awards.write.format("delta").mode("overwrite").save("/mnt/silver/delta/awards")

In [0]:
# Now, df_bronze is ready for analysis. now we save it into delta table

df_bronze.write.format("delta").mode("overwrite").save("/mnt/silver/Nobel_Price_Cleaned")


# Gold

> Fetch the Data from Silver Layer and Create a Star schema

In [0]:
# Load 3NF Delta Tables from Silver Layer
df_silver = spark.read.format("delta").load("/mnt/silver/Nobel_Price_Cleaned") 

> Create Star schema

- Time Dimenstion table

In [0]:
# Create Dimension Tables
from pyspark.sql.functions import *
from pyspark.sql.window import Window

w_time = Window.orderBy("Year")
dim_time = df_silver.select("Year").distinct() \
    .withColumn("Decade", (floor(col("Year") / 10) * 10).cast("int")) \
    .withColumn("Century", (floor(col("Year") / 100) * 100).cast("int")) \
    .withColumn("time_id", row_number().over(w_time))


- Category Dimension Table

In [0]:
w_cat = Window.orderBy("Category")
dim_category = df_silver.select("Category").distinct() \
    .withColumn("category_id", row_number().over(w_cat))


- Organization Dimension (dim_organization)

In [0]:
w_org = Window.orderBy("Organization_name")
dim_organization = df_silver.select(
    "Organization_name", "Organization_city", "Organization_country"
).distinct() \
 .withColumn("organization_id", row_number().over(w_org))


- Laureate Dimension (dim_laureate)

In [0]:
dim_laureate = df_silver.select(
    "Id", "Full_Name", "Gender", "Born", "Died",
    col("Born_city").alias("born_city"),
    col("Born_country").alias("born_country"),
    col("Born_country_code").alias("born_country_code"),
    col("Died_city").alias("died_city"),
    col("Died_country").alias("died_country"),
    col("Died_country_code").alias("died_country_code")
).distinct()

- Fact Table (fact_awards)

In [0]:
# --- fact_awards ---
fact_awards = df_silver.select("Id", "Year", "Category", "Organization_name", "Motivation") \
    .join(dim_laureate.select("Id", "Born"), "Id", "left") \
    .withColumn("Born", to_date("Born")) \
    .withColumn("Age_at_award", when(col("Born").isNotNull(), col("Year") - year("Born")).otherwise(None)) \
    .withColumn("Posthumous_flag", col("Age_at_award") < 0)

# Join with dimensions
fact_awards = fact_awards \
    .join(broadcast(dim_time.select("Year", "time_id")), "Year", "left") \
    .join(broadcast(dim_category.select("Category", "category_id")), "Category", "left") \
    .join(broadcast(dim_organization.select("Organization_name", "organization_id")), "Organization_name", "left") \
    .withColumn("Prize_count", lit(1)) \
    .withColumn("award_id", row_number().over(Window.orderBy("Id", "Year", "Category"))) \
    .select(
        "award_id",
        "Id",  # foreign key to laureate
        "time_id",
        "category_id",
        "organization_id",
        "Motivation",
        "Age_at_award",
        "Posthumous_flag",
        "Prize_count"
    )

- Save Data a Delta file in to Gold Layer

In [0]:
# Writing dim_time in Delta format
dim_time.write.format("delta").mode("overwrite").save("/mnt/gold/dim_time")

# Writing dim_category in Delta format
dim_category.write.format("delta").mode("overwrite").save("/mnt/gold/dim_category")

# Writing dim_organization in Delta format
dim_organization.write.format("delta").mode("overwrite").save("/mnt/gold/dim_organization")

# Writing dim_laureate in Delta format
dim_laureate.write.format("delta").mode("overwrite").save("/mnt/gold/dim_laureate")

# Writing fact_awards in Delta format
fact_awards.write.format("delta").mode("overwrite").save("/mnt/gold/fact_awards")


- Storing in Delta Table

In [0]:
dim_time.write.mode("overwrite").saveAsTable("gold_time")
dim_category.write.mode("overwrite").saveAsTable("gold_category")
dim_organization.write.mode("overwrite").saveAsTable("gold_organization")
dim_laureate.write.mode("overwrite").saveAsTable("gold_laureate")
fact_awards.write.mode("overwrite").saveAsTable("gold_nobel")

- Continue to PowerBI for Visualization