## Spark session configuration
 This cell sets Spark session settings to enable Verti-Parquet and Optimize on Write. More details about Verti-Parquet and Optimize on Write in tutorial document.

In [1]:
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

StatementMeta(, a1d124d9-0fc1-4f2e-bb8a-efac616518c9, 3, Finished, Available, Finished)

## Load your DataFrame

In [2]:
df = spark.read.parquet("Files")
print(df)

StatementMeta(, a1d124d9-0fc1-4f2e-bb8a-efac616518c9, 4, Finished, Available, Finished)

DataFrame[vendorID: int, lpepPickupDatetime: timestamp, lpepDropoffDatetime: timestamp, passengerCount: int, tripDistance: double, puLocationId: string, doLocationId: string, pickupLongitude: double, pickupLatitude: double, dropoffLongitude: double, dropoffLatitude: double, rateCodeID: int, storeAndFwdFlag: string, paymentType: int, fareAmount: double, extra: double, mtaTax: double, improvementSurcharge: string, tipAmount: double, tollsAmount: double, ehailFee: double, totalAmount: double, tripType: int, puYear: int, puMonth: int]


## Date Dimension Table

In [3]:
from pyspark.sql.functions import col, year, month, quarter

table_name = 'fact_trip'

df = df.withColumn('Year', year(col("lpepDropoffDatetime")))
df = df.withColumn('Quarter', quarter(col("lpepDropoffDatetime")))
df = df.withColumn('Month', month(col("lpepDropoffDatetime")))

df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
     

StatementMeta(, a1d124d9-0fc1-4f2e-bb8a-efac616518c9, 5, Finished, Available, Finished)

In [7]:
print(df)

StatementMeta(, a1d124d9-0fc1-4f2e-bb8a-efac616518c9, 9, Finished, Available, Finished)

DataFrame[vendorID: int, lpepPickupDatetime: timestamp, lpepDropoffDatetime: timestamp, passengerCount: int, tripDistance: double, puLocationId: string, doLocationId: string, pickupLongitude: double, pickupLatitude: double, dropoffLongitude: double, dropoffLatitude: double, rateCodeID: int, storeAndFwdFlag: string, paymentType: int, fareAmount: double, extra: double, mtaTax: double, improvementSurcharge: string, tipAmount: double, tollsAmount: double, ehailFee: double, totalAmount: double, tripType: int, puYear: int, puMonth: int]


## **Dimensions**
This cell creates a function to read raw data from the Files section of the lakehouse for the table name passed as a parameter. Next, it creates a list of dimension tables. Finally, it has a for loop to loop through the list of tables and call above function with each table name as parameter to read data for that specific table and create delta table.

In [9]:
# Load your DataFrame
df = spark.read.format("parquet").load('Files')

StatementMeta(, a1d124d9-0fc1-4f2e-bb8a-efac616518c9, 11, Finished, Available, Finished)

In [11]:
print(df.select("lpepPickupDatetime"))

StatementMeta(, a1d124d9-0fc1-4f2e-bb8a-efac616518c9, 13, Finished, Available, Finished)

DataFrame[lpepPickupDatetime: timestamp]


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, dayofweek, hour, minute, second, when

# Load your DataFrame
df = spark.read.format("parquet").load('Files')
df = df.select([c for c in df.columns if c != 'Photo'])

# Function to create and save dimension tables
def create_and_save_dimension(df, table_name, columns, transformations=[]):
    dimension_df = df
    for transformation in transformations:
        dimension_df = transformation(dimension_df)
    dimension_df = dimension_df.select(columns).distinct()
    dimension_df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

# Define the transformations
def date_transformations(df):
    return df.withColumn("Day", dayofmonth(col("lpepPickupDatetime"))) \
             .withColumn("Month", month(col("lpepPickupDatetime"))) \
             .withColumn("Year", year(col("lpepPickupDatetime"))) \
             .withColumn("DayOfWeek", dayofweek(col("lpepPickupDatetime"))) \
             .withColumn("IsWeekend", when(col("DayOfWeek").isin([7, 1]), "Yes").otherwise("No"))

def time_transformations(df):
    return df.withColumn("Hour", hour(col("lpepPickupDatetime"))) \
             .withColumn("Minute", minute(col("lpepPickupDatetime"))) \
             .withColumn("Second", second(col("lpepPickupDatetime"))) \
             .withColumn("PeriodOfDay", when(col("Hour") < 6, "Night")
                                      .when((col("Hour") >= 6) & (col("Hour") < 12), "Morning")
                                      .when((col("Hour") >= 12) & (col("Hour") < 18), "Afternoon")
                                      .otherwise("Evening"))

# Define the dimensions
dimensions = [
    {
        'table_name': 'dimension_date',
        'columns': ["Date", "Day", "Month", "Year", "DayOfWeek", "IsWeekend"],
        'transformations': [date_transformations]
    },
    {
        'table_name': 'dimension_time',
        'columns': ["Time", "Hour", "Minute", "Second", "PeriodOfDay"],
        'transformations': [time_transformations]
    },
    {
        'table_name': 'dimension_vendor',
        'columns': ["vendorID"]
    }
]

# Create and save each dimension table
for dimension in dimensions:
    if 'transformations' in dimension:
        df_transformed = df.withColumn("Date", col("lpepPickupDatetime")).withColumn("Time", col("lpepPickupDatetime"))
        create_and_save_dimension(df_transformed, dimension['table_name'], dimension['columns'], dimension['transformations'])
    else:
        create_and_save_dimension(df, dimension['table_name'], dimension['columns'])


StatementMeta(, a1d124d9-0fc1-4f2e-bb8a-efac616518c9, 15, Submitted, Running, Running)