### ETL in Databricks
This notebook gives some basic commands for performing necessary ETL functions. Use this to create a medallion ETL pipeline using the financial dataset.

You should have:
 - bronze schema with tables for each set
 - silver schema with cleaned and formatted tables
 - gold schema with aggregated tables (to answer the questions in the notion page)

The notebook will be used as the source a daily job to refresh the pipeline (The whole notebook will be executed) and a dashboard will be created using the gold tables as source data.


In [0]:
%sql
-- Use the default catalog (main)
USE CATALOG jarvis_server_1;

-- Create schemas for medallion architecture
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

-- Verify schemas
SHOW SCHEMAS;


databaseName
bronze
default
gold
information_schema
silver


In [0]:
url = "jdbc:sqlserver://jarvis-server-1.database.windows.net:1433;database=Financial-Database;encrypt=true;trustServerCertificate=false;loginTimeout=30;"
user = "jarvis-server-1"
password = "Lecture6"

# Connect via Spark
df_tables = spark.read.format("jdbc") \
    .option("url", url) \
    .option("dbtable", "INFORMATION_SCHEMA.TABLES") \
    .option("user", user) \
    .option("password", password) \
    .load()

display(df_tables)


[0;31m---------------------------------------------------------------------------[0m
[0;31mUnknownException[0m                          Traceback (most recent call last)
File [0;32m<command-5999673723621170>, line 13[0m
[1;32m      5[0m [38;5;66;03m# Connect via Spark[39;00m
[1;32m      6[0m df_tables [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mjdbc[39m[38;5;124m"[39m) \
[1;32m      7[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124murl[39m[38;5;124m"[39m, url) \
[1;32m      8[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mdbtable[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mINFORMATION_SCHEMA.TABLES[39m[38;5;124m"[39m) \
[1;32m      9[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124muser[39m[38;5;124m"[39m, user) \
[1;32m     10[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mpassword[39m[38;5;124m"[39m, password) \
[1;32m     11[0m     [38;5;241m.

In [0]:
from pyspark.sql.functions import from_json, explode, col

# -------------------------------
# Paths to volumes
# -------------------------------
base_path = "/Volumes/jarvis_server_1/default/volume"
mcc_path = f"{base_path}/mcc_codes.json"
labels_path = f"{base_path}/train_fraud_labels.json"

# -------------------------------
# 1. Read + normalize MCC codes into df_mcc
# -------------------------------
df_mcc = (
    spark.read.text(mcc_path)  # read as text because it's a JSON map
    .select(from_json(col("value"), "map<string,string>").alias("mcc_map"))
    .select(explode(col("mcc_map")).alias("mcc_code", "mcc_description"))
    .withColumn("mcc_code", col("mcc_code").cast("int"))
)

# -------------------------------
# 2. Read fraud labels into df_labels
# -------------------------------
df_labels = spark.read.json(labels_path)  # standard row-based JSON

# -------------------------------
# Now both volumes are stored in variables
# -------------------------------
print("df_mcc and df_labels are ready!")
display(df_mcc)
display(df_labels)




com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# -------------------------------
# Save MCC codes (from volume)
# -------------------------------
df_mcc.write \
    .mode("overwrite") \
    .saveAsTable("jarvis_server_1.bronze.mcc_codes")

# -------------------------------
# Save Fraud labels (from volume)
# -------------------------------
df_labels.write \
    .mode("overwrite") \
    .saveAsTable("jarvis_server_1.bronze.train_fraud_labels")

# -------------------------------
# Save Transactions table (existing table)
# -------------------------------
df_tx = spark.table("jarvis_server_1.default.transactions_data")
df_tx.write \
    .mode("overwrite") \
    .saveAsTable("jarvis_server_1.bronze.transactions_data")

# -------------------------------
# Save SQL Server tables (optional if you want them in bronze)
# -------------------------------
url = "jdbc:sqlserver://jarvis-server-1.database.windows.net:1433;database=Financial-Database;encrypt=true;trustServerCertificate=false;loginTimeout=30;"
user = "jarvis-server-1"
password = "Lecture6"

# Example: load dbo.cards_data and save to bronze
df_cards = spark.read.format("jdbc") \
    .option("url", url) \
    .option("dbtable", "dbo.cards_data") \
    .option("user", user) \
    .option("password", password) \
    .load()

df_cards.write \
    .mode("overwrite") \
    .saveAsTable("jarvis_server_1.bronze.cards_data")

# Repeat similarly for dbo.users_data if needed

# -------------------------------
# Verify all tables in bronze
# -------------------------------
spark.sql("SHOW TABLES IN jarvis_server_1.bronze").show()


In [0]:
# Read from your bronze (raw) table
bronze_df = spark.read.table("catalog.schema.bronze_table")

# Transform your bronze dataframe to silver (clean, format, remove null, etc.)
df_silver = (df_bronze
             .withColumn("date_example_formatted", to_timestamp(col("date_example"), "yyyy-MM-dd HH:mm:ss"))
             # etc.
)

# confirm results
display(df_silver)

In [0]:
df_silver.write.mode("overwrite").saveAsTable("catalog.schema.silver_table")

In [0]:
# Transform your silver dataframe to gold (aggregated, joined, etc.)
silver_df = spark.read.table("catalog.schema.silver_table")

gold_df = (silver_df
           .groupBy("address") # example
           .agg(sum("total_debt").alias("total_debt"))
           .orderBy(desc("total_debt"))
)

display(gold_df)

In [0]:
gold_df.write.mode("overwrite").saveAsTable("catalog.schema.gold_table")