In [0]:
#### The code connects to an Azure SQL Database, reads specified tables, and ingests the data into a designated directory in Delta format, overwriting any existing data. The ingested data is part of the "Bronze layer," which typically represents raw or minimally processed data in a data lake or data warehouse architecture.

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("BronzeLayer").getOrCreate()

# Define connection properties for Azure SQL Database
jdbc_url = "jdbc:sqlserver://azureserverrakshitha.database.windows.net:1433;database=SqlDatabase"
sqlserver_properties = {
    "user": "rakshitha",
    "password": "Vasavi@06",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# List of tables to be ingested from Azure SQL Database
tables = ["SourceSalesTransactions", "SourceInventoryLogs", "SourceCustomerData"]  

# Base path for storing Bronze layer data
bronze_base_path = "/bronze"

# Function to read and write each table
def ingest_table(table_name):
    query = f"(SELECT * FROM {table_name}) AS {table_name}"
    df = spark.read.jdbc(url=jdbc_url, table=query, properties=sqlserver_properties)
    df.write.format("delta").mode("overwrite").save(bronze_base_path + table_name)

# Iterate over the list of tables and ingest each one
for table in tables:
    ingest_table(table)

print("Data ingestion to Bronze layer completed.")


Data ingestion to Bronze layer completed.


In [0]:
# Dictionary to store DataFrames
dataframes = {}

# Function to read each table into a DataFrame
def read_table_to_df(table_name):
    df = spark.read.format("delta").load(bronze_base_path + table_name)
    return df

# Iterate over the list of tables and read each one into a DataFrame
for table in tables:
    dataframes[table] = read_table_to_df(table)

# Now you can access the DataFrames from the dictionary
df_transaction = dataframes["SourceSalesTransactions"]
df_inventory = dataframes["SourceInventoryLogs"]
df_customerdata = dataframes["SourceCustomerData"]

# Display the DataFrames to verify the data
df_transaction.show()
df_inventory.show()
df_customerdata.show()


+-------------+----------+---------+--------+-------------------+
|TransactionID|CustomerID|ProductID|Quantity|          Timestamp|
+-------------+----------+---------+--------+-------------------+
|            1|       186|     P010|       9|2024-06-01 23:23:00|
|            2|       117|     P004|       6|2024-06-01 16:16:00|
|            3|       137|     P001|       5|2024-06-01 00:16:00|
|            4|       198|     P007|       6|2024-06-01 13:09:00|
|            5|       199|     P009|      10|2024-06-01 07:04:00|
|            6|       144|     P008|       6|2024-06-01 22:21:00|
|            7|       146|     P004|       2|2024-06-01 20:58:00|
|            8|       139|     P009|       8|2024-06-01 20:10:00|
|            9|       101|     P005|       8|2024-06-01 15:52:00|
|           10|       169|     P008|       5|2024-06-01 14:50:00|
|           11|       150|     P010|       5|2024-06-01 11:05:00|
|           12|       184|     P007|       3|2024-06-01 03:31:00|
|         

In [0]:
df_transaction.count()

700

In [0]:
df_inventory.count()

700

In [0]:
df_customerdata.count()

1500

In [0]:
from pyspark.sql.functions import col, split, explode, trim, sum, avg

# Explode PurchaseHistory and Preferences
df_customerdata = df_customerdata.withColumn("PurchaseHistory", explode(split(col("PurchaseHistory"), ", ")))
df_customerdata = df_customerdata.withColumn("Preferences", explode(split(col("Preferences"), ", ")))

# Example data cleansing: Trimming spaces from strings
df_customerdata = df_customerdata.withColumn("PurchaseHistory", trim(col("PurchaseHistory")))
df_customerdata = df_customerdata.withColumn("Preferences", trim(col("Preferences")))

# Join transaction data with customer data on CustomerID and drop duplicate CustomerID column
df_combined = df_transaction.join(df_customerdata, df_transaction.CustomerID == df_customerdata.CustomerID, "inner") \
                            .drop(df_customerdata.CustomerID)

# Selecting relevant columns for analysis
df_combined = df_combined.select(
    df_combined.TransactionID,
    df_combined.CustomerID,
    df_combined.Name,
    df_combined.Age,
    df_combined.Gender,
    df_combined.ProductID,
    df_combined.Quantity,
    df_combined.Timestamp,
    df_combined.PurchaseHistory,
    df_combined.Preferences
)


In [0]:
# Calculate total purchases and average purchase quantity per customer
df_features = df_combined.groupBy("CustomerID").agg(
    sum("Quantity").alias("TotalPurchases"),
    avg("Quantity").alias("AveragePurchaseQuantity")
)

# Join the features back with the combined DataFrame
df_combined = df_combined.join(df_features, "CustomerID", "inner")

#No missing values detected, Removing duplicates and Standardizing timestamp format

# Define schema for the data
schema = "CustomerID INT, TransactionID INT, Name STRING, Age INT, Gender STRING, ProductID STRING, Quantity INT, Timestamp TIMESTAMP, PurchaseHistory STRING, Preferences STRING, TotalPurchases INT, AveragePurchaseQuantity DOUBLE"

# Cleanse data
df_cleaned = df_combined.dropDuplicates()  # Remove duplicates
df_cleaned = df_cleaned.withColumn("Timestamp", col("Timestamp").cast("timestamp"))

# Show cleansed data
print("Cleaned Data:")
df_cleaned.show(truncate=False)

# Show the final DataFrame with enriched data and new features
df_combined.show(truncate=False)
print("Row count:", df_combined.count())

# Define the Silver layer base path
silver_base_path = "/silver/"

# Write the transformed DataFrame to the Silver layer in Delta format
df_combined.write.format("delta").mode("overwrite").save(silver_base_path + "enriched_data")

Cleaned Data:
+----------+-------------+-----------+---+------+---------+--------+-------------------+---------------+-----------+--------------+-----------------------+
|CustomerID|TransactionID|Name       |Age|Gender|ProductID|Quantity|Timestamp          |PurchaseHistory|Preferences|TotalPurchases|AveragePurchaseQuantity|
+----------+-------------+-----------+---+------+---------+--------+-------------------+---------------+-----------+--------------+-----------------------+
|148       |677          |Customer148|44 |Male  |P003     |2       |2024-06-01 05:28:00|ProductX       |CategoryH  |200           |5.0                    |
|148       |677          |Customer148|44 |Male  |P003     |2       |2024-06-01 05:28:00|ProductX       |CategoryT  |200           |5.0                    |
|148       |677          |Customer148|44 |Male  |P003     |2       |2024-06-01 05:28:00|ProductF       |CategoryH  |200           |5.0                    |
|148       |677          |Customer148|44 |Male  |P

In [0]:
from pyspark.sql.functions import sum, avg, month, year

# Path to the Silver layer
silver_base_path = "/silver/"

# Read the enriched data from the Silver layer
df_silver = spark.read.format("delta").load(silver_base_path + "enriched_data")

# Perform additional aggregations and transformations
df_gold = df_silver.groupBy(
    "CustomerID", 
    year("Timestamp").alias("Year"), 
    month("Timestamp").alias("Month")
).agg(
    sum("Quantity").alias("TotalPurchases"),
    avg("Quantity").alias("AveragePurchaseQuantity")
)

# Show the final DataFrame with aggregated data
display(df_gold)
print("Row count:", df_gold.count())

# Define the Gold layer base path
gold_base_path = "/gold/"

# Write the transformed DataFrame to the Gold layer in Delta format
df_gold.write.format("delta").mode("overwrite").save(gold_base_path + "monthly_customer_purchases")


CustomerID,Year,Month,TotalPurchases,AveragePurchaseQuantity
102,2024,6,496,5.166666666666667
146,2024,6,248,5.166666666666667
180,2024,6,280,4.375
111,2024,6,488,6.777777777777778
154,2024,6,112,4.666666666666667
195,2024,6,424,5.888888888888889
122,2024,6,184,5.75
131,2024,6,496,5.166666666666667
124,2024,6,120,7.5
158,2024,6,384,6.857142857142857


Row count: 100


In [0]:

# Save the DataFrame as a Delta table in the specified database
df_gold.write.format("delta").mode("append").save("/gold/monthly_customer_purchases")

# Load the table from the database
df_gold = spark.sql("SELECT * FROM my_database.monthly_customer_purchases")

# Show the contents of the DataFrame
df_gold.count()

# Print the schema of the DataFrame
df_gold.printSchema()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-4199871686648304>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Save the DataFrame as a Delta table in the specified database[39;00m
[0;32m----> 2[0m df_gold[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m)[38;5;241m.[39msave([38;5;124m"[39m[38;5;124m/gold/monthly_customer_purchases[39m[38;5;124m"[39m)
[1;32m      4[0m [38;5;66;03m# Load the table from the database[39;00m
[1;32m      5[0m df_gold [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124mSELECT * FROM my_database.monthly_customer_purchases[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'df_gold' is not defined

In [0]:
# Load the table from the database
df_gold = spark.sql("SELECT * FROM my_database.monthly_customer_purchases")

# Show the contents of the DataFrame
df_gold.count()

100