
## Brief: Building a Data Transformation Pipeline in the Cloud

In [0]:
%fs ls "/databricks-datasets/"

# Read all the data from the dataset with header
data = spark.read.option("header", "true").csv("/databricks-datasets/sfo_customer_survey")

# Display the data
display(data)

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,1732268708053
dbfs:/databricks-datasets/README.md,README.md,976,1561418533000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,1732268708053
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1561418605000
dbfs:/databricks-datasets/adult/,adult/,0,1732268708053
dbfs:/databricks-datasets/airlines/,airlines/,0,1732268708053
dbfs:/databricks-datasets/amazon/,amazon/,0,1732268708053
dbfs:/databricks-datasets/asa/,asa/,0,1732268708053
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,1732268708053
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,1732268708053


## store the raw data in the Bronze Layer

In [0]:


## Save the data in CSV format on Azure Blob Storage on Bronze layer

storage_name = "*********"
storage_key = "**********"
container_name = "********"
output_path = f"wasbs://{container_name}@{storage_name}.blob.core.windows.net/couches/bronze/sfo_customer_survey.csv"

spark.conf.set(f"fs.azure.account.key.{storage_name}.blob.core.windows.net", storage_key)

data.write.mode("overwrite").csv(output_path, header=True)

## Data Cleaning and Transformation then Storing in Silver Layer

In [0]:
spark.conf.set(f"fs.azure.account.key.{storage_name}.dfs.core.windows.net", storage_key)
files = dbutils.fs.ls(f"abfss://{container_name}@{storage_name}.dfs.core.windows.net/couches/bronze/")
print(files)

[FileInfo(path='abfss://azuredatabrick@storagelauradataengineer.dfs.core.windows.net/couches/bronze/matches-19302010.csv', name='matches-19302010.csv', size=824541, modificationTime=1732202994000), FileInfo(path='abfss://azuredatabrick@storagelauradataengineer.dfs.core.windows.net/couches/bronze/products.csv', name='products.csv', size=14372, modificationTime=1732182241000), FileInfo(path='abfss://azuredatabrick@storagelauradataengineer.dfs.core.windows.net/couches/bronze/sfo_customer_survey.csv/', name='sfo_customer_survey.csv/', size=0, modificationTime=1732269258000)]


In [0]:
from pyspark.sql.functions import col, to_date, to_timestamp, trim, upper
from pyspark.sql.types import IntegerType, FloatType, StringType, StructType, StructField

spark.conf.set(f"fs.azure.account.key.{storage_name}.blob.core.windows.net", storage_key)

bronze_path = f"abfss://{container_name}@{storage_name}.dfs.core.windows.net/couches/bronze/sfo_customer_survey.csv"

raw_data = spark.read.option("header", True).option("inferSchema", True).csv(bronze_path)
raw_data.printSchema()

root
 |-- RESPNUM: string (nullable = true)
 |-- CCGID: string (nullable = true)
 |-- RUN: string (nullable = true)
 |-- INTDATE: integer (nullable = true)
 |-- GATE: integer (nullable = true)
 |-- STRATA: integer (nullable = true)
 |-- PEAK: integer (nullable = true)
 |-- METHOD: integer (nullable = true)
 |-- AIRLINE: integer (nullable = true)
 |-- FLIGHT: integer (nullable = true)
 |-- DEST: integer (nullable = true)
 |-- DESTGEO: integer (nullable = true)
 |-- DESTMARK: integer (nullable = true)
 |-- ARRTIME: string (nullable = true)
 |-- DEPTIME: string (nullable = true)
 |-- Q2PURP1: integer (nullable = true)
 |-- Q2PURP2: integer (nullable = true)
 |-- Q2PURP3: integer (nullable = true)
 |-- Q2PURP4: integer (nullable = true)
 |-- Q2PURP5: integer (nullable = true)
 |-- Q2PURP6: string (nullable = true)
 |-- Q3GETTO1: integer (nullable = true)
 |-- Q3GETTO2: integer (nullable = true)
 |-- Q3GETTO3: integer (nullable = true)
 |-- Q3GETTO4: integer (nullable = true)
 |-- Q3GETTO5:

In [0]:

# Schema Definition (Explicit for All Columns)
schema = StructType([
    StructField("RESPNUM", IntegerType(), True),
    StructField("CCGID", IntegerType(), True),
    StructField("RUN", IntegerType(), True),
    StructField("INTDATE", StringType(), True),
    StructField("GATE", IntegerType(), True),
    StructField("STRATA", IntegerType(), True),
    StructField("PEAK", IntegerType(), True),
    StructField("METHOD", IntegerType(), True),
    StructField("AIRLINE", IntegerType(), True),
    StructField("FLIGHT", IntegerType(), True),
    StructField("DEST", StringType(), True),
    StructField("DESTGEO", IntegerType(), True),
    StructField("DESTMARK", IntegerType(), True),
    StructField("ARRTIME", StringType(), True),
    StructField("DEPTIME", StringType(), True),
    StructField("Q2PURP1", IntegerType(), True),
    StructField("Q2PURP2", IntegerType(), True),
    StructField("Q2PURP3", IntegerType(), True),
    StructField("Q2PURP4", IntegerType(), True),
    StructField("Q2PURP5", IntegerType(), True),
    StructField("Q2PURP6", IntegerType(), True),
    StructField("Q3GETTO1", IntegerType(), True),
    StructField("Q3GETTO2", IntegerType(), True),
    StructField("Q3GETTO3", IntegerType(), True),
    StructField("Q3GETTO4", IntegerType(), True),
    StructField("Q3GETTO5", IntegerType(), True),
    StructField("Q3GETTO6", IntegerType(), True),
    StructField("Q3PARK", IntegerType(), True),
    StructField("Q4BAGS", IntegerType(), True),
    StructField("Q4BUY", IntegerType(), True),
    StructField("Q4FOOD", IntegerType(), True),
    StructField("Q4WIFI", IntegerType(), True),
    StructField("Q5FLYPERYR", IntegerType(), True),
    StructField("Q6TENURE", IntegerType(), True),
    StructField("SAQ", IntegerType(), True),
    StructField("Q7A_ART", IntegerType(), True),
    StructField("Q7B_FOOD", IntegerType(), True),
    StructField("Q7C_SHOPS", IntegerType(), True),
    StructField("Q7D_SIGNS", IntegerType(), True),
    StructField("Q7E_WALK", IntegerType(), True),
    StructField("Q7F_SCREENS", IntegerType(), True),
    StructField("Q7G_INFOARR", IntegerType(), True),
    StructField("Q7H_INFODEP", IntegerType(), True),
    StructField("Q7I_WIFI", IntegerType(), True),
    StructField("Q7J_ROAD", IntegerType(), True),
    StructField("Q7K_PARK", IntegerType(), True),
    StructField("Q7L_AIRTRAIN", IntegerType(), True),
    StructField("Q7M_LTPARK", IntegerType(), True),
    StructField("Q7N_RENTAL", IntegerType(), True),
    StructField("Q7O_WHOLE", IntegerType(), True),
    StructField("Q8COM1", StringType(), True),
    StructField("Q8COM2", StringType(), True),
    StructField("Q8COM3", StringType(), True),
    StructField("Q9A_CLNBOARD", IntegerType(), True),
    StructField("Q9B_CLNAIRTRAIN", IntegerType(), True),
    StructField("Q9C_CLNRENT", IntegerType(), True),
    StructField("Q9D_CLNFOOD", IntegerType(), True),
    StructField("Q9E_CLNBATH", IntegerType(), True),
    StructField("Q9F_CLNWHOLE", IntegerType(), True),
    StructField("Q9COM1", StringType(), True),
    StructField("Q9COM2", StringType(), True),
    StructField("Q9COM3", StringType(), True),
    StructField("Q10SAFE", IntegerType(), True),
    StructField("Q10COM1", StringType(), True),
    StructField("Q10COM2", StringType(), True),
    StructField("Q10COM3", StringType(), True),
    StructField("Q11A_USEWEB", IntegerType(), True),
    StructField("Q11B_USESFOAPP", IntegerType(), True),
    StructField("Q11C_USEOTHAPP", IntegerType(), True),
    StructField("Q11D_USESOCMED", IntegerType(), True),
    StructField("Q11E_USEWIFI", IntegerType(), True),
    StructField("Q12COM1", StringType(), True),
    StructField("Q12COM2", StringType(), True),
    StructField("Q12COM3", StringType(), True),
    StructField("Q13_WHEREDEPART", StringType(), True),
    StructField("Q13_RATEGETTO", IntegerType(), True),
    StructField("Q14A_FIND", IntegerType(), True),
    StructField("Q14B_SECURITY", IntegerType(), True),
    StructField("Q15_PROBLEMS", StringType(), True),
    StructField("Q16_REGION", StringType(), True),
    StructField("Q17_CITY", StringType(), True),
    StructField("Q17_ZIP", IntegerType(), True),
    StructField("Q17_COUNTRY", StringType(), True),
    StructField("HOME", IntegerType(), True),
    StructField("Q18_AGE", IntegerType(), True),
    StructField("Q19_SEX", IntegerType(), True),
    StructField("Q20_INCOME", IntegerType(), True),
    StructField("Q21_HIFLYER", IntegerType(), True),
    StructField("Q22A_USESJC", IntegerType(), True),
    StructField("Q22B_USEOAK", IntegerType(), True),
    StructField("LANG", IntegerType(), True),
    StructField("WEIGHT", FloatType(), True)
])


In [0]:
# Cleaning Data
cleaned_data = raw_data \
    .replace("", None) \
    .withColumn("INTDATE", to_date(col("INTDATE"), "MM/dd/yyyy")) \
    .withColumn("ARRTIME", to_timestamp(col("ARRTIME"), "HH:mm")) \
    .withColumn("DEPTIME", to_timestamp(col("DEPTIME"), "HH:mm"))

# Normalize strings
string_columns = ["DEST", "Q16_REGION", "Q17_CITY", "Q17_COUNTRY", "Q13_WHEREDEPART", "Q15_PROBLEMS"]
for col_name in string_columns:
    cleaned_data = cleaned_data.withColumn(col_name, upper(trim(col(col_name))))

# Convert numerical columns explicitly
numerical_columns = ["Q18_AGE", "Q20_INCOME", "WEIGHT"]
for col_name in numerical_columns:
    cleaned_data = cleaned_data.withColumn(col_name, col(col_name).cast(FloatType() if col_name == "WEIGHT" else IntegerType()))

# Remove duplicates and filter invalid rows
cleaned_data = cleaned_data.dropDuplicates()





In [0]:
bronze_path = f"abfss://{container_name}@{storage_name}.dfs.core.windows.net/couches/bronze/sfo_customer_survey.csv"


In [0]:
# Write Cleaned Data to Silver Layer
silver_path = f"abfss://{container_name}@{storage_name}.dfs.core.windows.net/couches/silver/sfo_customer_survey_silver.csv"
cleaned_data.write.mode("overwrite").parquet(silver_path)

print("Data cleaning and transformation completed. Stored in Silver layer.")

Data cleaning and transformation completed. Stored in Silver layer.


## Create the Star Schema  

In [0]:
# 1. Dimension: Airline
dim_airline = raw_data.select("AIRLINE").distinct() \
    .withColumnRenamed("AIRLINE", "airline_code") \
    .withColumn("airline_name", col("airline_code"))  # Add a placeholder name

# 2. Dimension: Demographics
dim_demographics = raw_data.select(
    col("RESPNUM").alias("response_id"),
    col("Q18_AGE").cast("int").alias("age"),
    col("Q19_SEX").alias("gender"),
    col("Q20_INCOME").alias("income_level"),
    col("Q17_COUNTRY").alias("country"),
    col("HOME").alias("home_flag")
).dropDuplicates()

# 3. Dimension: Survey Questions
survey_columns = [c for c in raw_data.columns if c.startswith("Q")]
dim_survey = raw_data.select("RESPNUM", *survey_columns).dropDuplicates()

# 4. Dimension: Flight Details
dim_flight = raw_data.select(
    col("RESPNUM").alias("response_id"),
    col("FLIGHT").alias("flight_number"),
    col("DEST").alias("destination"),
    col("DESTMARK").alias("destination_mark"),
    col("ARRTIME").alias("arrival_time"),
    col("DEPTIME").alias("departure_time")
).dropDuplicates()



In [0]:
fact_survey_responses = raw_data.select(
    col("RESPNUM").alias("response_id"),
    col("AIRLINE").alias("airline_code"),
    col("FLIGHT").alias("flight_number"),
    col("DEST").alias("destination"),
    col("DESTGEO").alias("destination_geo"),
    col("GATE").alias("gate"),
    col("ARRTIME").alias("arrival_time"),
    col("DEPTIME").alias("departure_time"),
    col("Q2PURP1").alias("purpose_1"),
    col("Q7A_ART").alias("art_rating"),
    col("Q7B_FOOD").alias("food_rating"),
    col("Q7I_WIFI").alias("wifi_rating"),
    col("Q9A_CLNBOARD").alias("cleanliness_boarding"),
    col("Q10SAFE").alias("safety_rating"),
    col("Q17_CITY").alias("city"),
    col("Q18_AGE").alias("age"),
    col("Q19_SEX").alias("gender"),
    col("INTDATE").alias("interview_date")
)


In [0]:
fact_survey_responses.limit(10).toPandas()


Unnamed: 0,response_id,airline_code,flight_number,destination,destination_geo,gate,arrival_time,departure_time,purpose_1,art_rating,food_rating,wifi_rating,cleanliness_boarding,safety_rating,city,age,gender,interview_date
0,1,21,1437,49,1,12,8:34 AM,9:25 AM,1,3,4,3,3,5,SAN FRANCISCO,2,1,2
1,2,21,1437,49,1,12,8:00 AM,9:25 AM,1,4,4,6,4,5,CONCORD,6,1,2
2,3,21,1437,49,1,12,7:00 AM,9:25 AM,1,3,4,3,3,3,SAN FRANCISCO,4,2,2
3,4,21,1437,49,1,12,7:30 AM,9:25 AM,1,3,3,4,5,5,,4,1,2
4,5,21,1437,49,1,12,6:30 AM,9:25 AM,1,3,3,5,5,5,HUNTINGTON BEACH,3,1,2
5,6,21,1437,49,1,12,8:00 AM,9:25 AM,2,4,3,1,3,5,SAN FRANCISCO,5,1,2
6,7,18,144,25,1,43,8:25 AM,10:35 AM,6,3,3,6,5,4,RODEO,9,1,2
7,8,18,144,25,1,43,9:25 AM,10:35 AM,2,5,4,5,5,5,SAN BRUNO,5,2,2
8,9,18,144,25,1,43,8:30 AM,10:35 AM,6,5,5,5,5,4,NEWARK,5,1,2
9,10,18,144,25,1,43,9:30 AM,10:35 AM,1,3,3,5,4,3,VALLEJO,3,2,2


In [0]:
gold_path = f"abfss://{container_name}@{storage_name}.dfs.core.windows.net/couches/gold/"
dim_airline_path = gold_path + "dim_airline"
dim_demographics_path = gold_path + "dim_demographics"
dim_survey_path = gold_path + "dim_survey"
dim_flight_path = gold_path + "dim_flight"
fact_survey_responses_path = gold_path + "fact_survey_responses"

# Save each dimension and fact table to the Gold Layer
dim_airline.write.mode("overwrite").parquet(dim_airline_path)
dim_demographics.write.mode("overwrite").parquet(dim_demographics_path)
dim_survey.write.mode("overwrite").parquet(dim_survey_path)
dim_flight.write.mode("overwrite").parquet(dim_flight_path)
fact_survey_responses.write.mode("overwrite").parquet(fact_survey_responses_path)


In [0]:
fact_survey_responses.limit(10).toPandas()

Unnamed: 0,response_id,airline_code,flight_number,destination,destination_geo,gate,arrival_time,departure_time,purpose_1,art_rating,food_rating,wifi_rating,cleanliness_boarding,safety_rating,city,age,gender,interview_date
0,1,21,1437,49,1,12,8:34 AM,9:25 AM,1,3,4,3,3,5,SAN FRANCISCO,2,1,2
1,2,21,1437,49,1,12,8:00 AM,9:25 AM,1,4,4,6,4,5,CONCORD,6,1,2
2,3,21,1437,49,1,12,7:00 AM,9:25 AM,1,3,4,3,3,3,SAN FRANCISCO,4,2,2
3,4,21,1437,49,1,12,7:30 AM,9:25 AM,1,3,3,4,5,5,,4,1,2
4,5,21,1437,49,1,12,6:30 AM,9:25 AM,1,3,3,5,5,5,HUNTINGTON BEACH,3,1,2
5,6,21,1437,49,1,12,8:00 AM,9:25 AM,2,4,3,1,3,5,SAN FRANCISCO,5,1,2
6,7,18,144,25,1,43,8:25 AM,10:35 AM,6,3,3,6,5,4,RODEO,9,1,2
7,8,18,144,25,1,43,9:25 AM,10:35 AM,2,5,4,5,5,5,SAN BRUNO,5,2,2
8,9,18,144,25,1,43,8:30 AM,10:35 AM,6,5,5,5,5,4,NEWARK,5,1,2
9,10,18,144,25,1,43,9:30 AM,10:35 AM,1,3,3,5,4,3,VALLEJO,3,2,2


## 