# Loading Data from Delta Table into DataFrame
This section loads data from the `hotelbook_silver` Delta table into a PySpark DataFrame and displays the first five rows for inspection.

In [1]:
# Loading the table data into the dataframe
df = spark.read.table("Hotel.hotelbook_silver")
df.limit(2).toPandas()

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 3, Finished, Available, Finished)

Unnamed: 0,Date,Month,Weekday,Season,Holiday,Marketing_Spend,Revenue,Room_Revenue,Occupancy_Rate,ADR,...,Occupancy_Managed_Guests,RevPAR_All,Occupancy_All,Room_Revenue_All,Total_Revenue,Operating_Expenses,Fixed_Costs,Variable_Costs,Total_Costs,Profit
0,2024-02-10,February,Saturday,Winter,No,65000.0,88000.0,49000.0,0.79,128.0,...,79.0,4.5,0.79,49000.0,55000.0,12000.0,6000.0,73000.0,15000.0,40000.0
1,2024-02-11,February,Sunday,Winter,No,70000.0,91000.0,50000.0,0.81,134.0,...,81.0,4.7,0.81,50000.0,57000.0,11500.0,7000.0,75500.0,15500.0,41500.0


### Delta Table Creation and Upsert for Date Dimension Table
This section performs the following operations for the `dimdategold` Delta table:
- Creates the `dimdategold` Delta table with a specified schema if it does not already exist.
- Prepares the `dfdimDateGold` DataFrame by selecting and processing date-related columns, removing duplicates, and adding a `Year` column.
- Performs an upsert operation:
  - **Update** existing records in the Delta table based on the `Date` column (currently empty in the provided code).
  - **Insert** new records from the DataFrame if they do not already exist in the table.

In [2]:
from pyspark.sql.types import*
from delta.tables import*

# Define the Schema for the dimDateGold table
DeltaTable.createIfNotExists(spark) \
     .tableName("Hotel.dimdategold") \
     .addColumn("Date", DateType()) \
     .addColumn("Month", StringType()) \
     .addColumn("Weekday", StringType()) \
     .addColumn("Year", IntegerType()) \
     .addColumn("Season", StringType()) \
     .addColumn("Holiday", StringType()) \
     .execute()


StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 4, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7e3b2bb27cd0>

In [3]:
from pyspark.sql.functions import col, date_add, date_format, year

# Create the dimDateGold dataframe

dfdimDateGold = df.dropDuplicates(["Date", "Month", "Weekday", "Season", "Holiday"]).select(col("Date"), col("Month"), col("Weekday"), col("Season"), col("Holiday"), \
        year("Date").alias("Year"), 
        ).orderBy("Date")
display(dfdimDateGold.head(5))

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b53b6e18-813a-4cb1-9ecb-96705198b4fa)

In [4]:
dfdimdateGold_temp = spark.read.table("Hotel.dimdategold")

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 6, Finished, Available, Finished)

In [5]:
from delta.tables import *

deltatable = DeltaTable.forPath(spark, "Tables/dimdategold")

dfUpdate = dfdimDateGold

deltatable.alias('gold') \
    .merge(
        dfUpdate.alias('updates'),
        'gold.Date = updates.Date'
    ) \
    .whenMatchedUpdate(set={
    
    }) \
    .whenNotMatchedInsert(values={
        "Date": "updates.Date",
        "Month": "updates.Month",
        "Weekday": "updates.Weekday",
        "Year": "updates.Year",
        "Season": "updates.Season",
        "Holiday": "updates.Holiday"
    }) \
    .execute()


StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 7, Finished, Available, Finished)

### Delta Table Creation and Upsert for Booking Channel Dimension
This section handles the creation and management of the `dimbookingchannelgold` Delta table:
- Creates the `dimbookingchannelgold` Delta table with columns for `Booking_Channel` and `Booking_ChannelKey` if it does not already exist.
- Prepares the `dfdimBookingChnlSilver` DataFrame by selecting distinct booking channels.
- Determines the maximum existing `Booking_ChannelKey` and assigns new keys to the booking channels that are not already in the Delta table.
- Performs an upsert operation:
  - **Update** existing records based on the `Booking_Channel` (currently empty in the provided code).
  - **Insert** new records into the Delta table, including the new `Booking_ChannelKey` values.


In [6]:
from pyspark.sql.types import *
from delta.tables import *
    
# Create BookingChannel_gold dimension delta table
DeltaTable.createIfNotExists(spark) \
     .tableName("Hotel.dimbookingchannelgold") \
     .addColumn("Booking_Channel", StringType()) \
     .addColumn("Booking_ChannelKey", LongType()) \
     .execute()

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 8, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7e3b2bb27520>

In [7]:
from pyspark.sql.functions import col
# Load data into the df 
dfdimBookingChnlSilver = df.dropDuplicates(["Booking_Channel"]).select(col("Booking_Channel"))
display(dfdimBookingChnlSilver.head(5))

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 29869be7-3a37-408f-a7c1-86686c36fa86)

In [8]:
from pyspark.sql.functions import monotonically_increasing_id, col, when, coalesce, max, lit

if spark.read.table("Hotel.dimbookingchannelgold").count() == 0:
    dfdimBookingGold = dfdimBookingChnlSilver.withColumn(
        "Booking_ChannelKey", monotonically_increasing_id() + 1
    )
else:
    #left_anti join logic
    dfdimBooking_temp = spark.read.table("Hotel.dimbookingchannelgold")
    MAXBookingID = dfdimBooking_temp.select(coalesce(max(col("Booking_ChannelKey")),lit(0)).alias("MAXBookingID")).first()[0]
    dfdimBookingGold = dfdimBookingChnlSilver.join(dfdimBooking_temp,(dfdimBookingChnlSilver.Booking_Channel == dfdimBooking_temp.Booking_Channel) , "left_anti")
    dfdimBookingGold = dfdimBookingGold.withColumn("Booking_ChannelKey",monotonically_increasing_id() + MAXBookingID + 1)


StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 10, Finished, Available, Finished)

In [9]:
# Previewing new data if available
if dfdimBookingGold.isEmpty():
    print("No new data to populate")
else:
    display(dfdimBookingGold.limit(2).toPandas())

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 11, Finished, Available, Finished)

No new data to populate


In [10]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'abfss://edcd96a3-3877-4553-b100-54aeef9c4401@onelake.dfs.fabric.microsoft.com/3f561343-f018-4ab6-a09f-74d0dd933efb/Tables/dimbookingchannelgold')
    
dfUpdates = dfdimBookingGold
    
deltaTable.alias('silver') \
  .merge(
    dfUpdates.alias('updates'),
    'silver.Booking_Channel = updates.Booking_Channel'
  ) \
   .whenMatchedUpdate(set =
    {
          
    }
  ) \
 .whenNotMatchedInsert(values =
    {
      "Booking_Channel": "updates.Booking_Channel",
      "Booking_ChannelKey": "updates.Booking_ChannelKey"
    }
  ) \
  .execute()

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 12, Finished, Available, Finished)

In [11]:
# Loading the table data into the dataframe
dfc = spark.read.table("Hotel.dimbookingchannelgold")
dfc.limit(2).toPandas()

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 13, Finished, Available, Finished)

Unnamed: 0,Booking_Channel,Booking_ChannelKey
0,OTA,1
1,Direct,2


### Delta Table Creation and Upsert for Guest Dimension
This section manages the creation and upsert operations for the `dimguestgold` Delta table:
- Creates the `dimguestgold` Delta table with columns for `Guest_Type`, `Guest_Country`, and `GuestKey` if it does not already exist.
- Prepares the `dfdimGuestSilver` DataFrame by selecting distinct combinations of guest type and country.
- Determines the maximum existing `GuestKey` and assigns new keys to the guest records that are not already present in the Delta table.
- Performs an upsert operation:
  - **Update** existing records based on matching `Guest_Type` and `Guest_Country` (currently empty in the provided code).
  - **Insert** new records into the Delta table, including the newly assigned `GuestKey` values.

In [12]:
from pyspark.sql.types import *
from delta.tables import *
    
# Create Guest_gold dimension delta table
DeltaTable.createIfNotExists(spark) \
     .tableName("Hotel.dimguestgold") \
     .addColumn("Guest_Type", StringType()) \
     .addColumn("Guest_Country", StringType()) \
     .addColumn("GuestKey", LongType()) \
     .execute()

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 14, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7e3b2bb255a0>

In [13]:
# Loading the data into the dataframe

dfdimGuestSilver = df.dropDuplicates(['Guest_Type' ,'Guest_Country']).select(col("Guest_Type") ,col("Guest_Country"))
display(dfdimGuestSilver.head(10))

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 185713b6-b2a6-4da9-9632-e0ed0d87a2b0)

In [14]:
from pyspark.sql.functions import monotonically_increasing_id, col, lit, max, coalesce
    
if spark.read.table("Hotel.dimguestgold").count() == 0:
    dfdimGuestGold = dfdimGuestSilver.withColumn(
        "GuestKey", monotonically_increasing_id() + 1)

else:
    dfdimGuest_temp = spark.read.table("Hotel.dimguestgold")
    MAXGuestID = dfdimGuest_temp.select(coalesce(max(col("GuestKey")),lit(0)).alias("MAXItemID")).first()[0]
    dfdimGuestGold = dfdimGuestSilver.join(dfdimGuest_temp,(dfdimGuestSilver.Guest_Type == dfdimGuest_temp.Guest_Type) & (dfdimGuestSilver.Guest_Country == dfdimGuest_temp.Guest_Country), "left_anti")
    dfdimGuestGold = dfdimGuestGold.withColumn("GuestKey",monotonically_increasing_id() + MAXGuestID + 1)

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 16, Finished, Available, Finished)

In [15]:
# Previewing new data if available
if dfdimGuestGold.isEmpty():
    print("No new data to populate")
else:
    display(dfdimGuestGold.limit(2).toPandas())

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 17, Finished, Available, Finished)

No new data to populate


In [16]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'abfss://edcd96a3-3877-4553-b100-54aeef9c4401@onelake.dfs.fabric.microsoft.com/3f561343-f018-4ab6-a09f-74d0dd933efb/Tables/dimguestgold')
    
dfUpdates = dfdimGuestGold
    
deltaTable.alias('silver') \
  .merge(
    dfUpdates.alias('updates'),
    'silver.Guest_Type = updates.Guest_Type AND silver.Guest_Country = updates.Guest_Country'
  ) \
   .whenMatchedUpdate(set =
    {
          
    }
  ) \
 .whenNotMatchedInsert(values =
    {
      "Guest_Type": "updates.Guest_Type",
      "Guest_Country": "updates.Guest_Country",
      "GuestKey": "updates.GuestKey"
      
    }
  ) \
  .execute()


StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 18, Finished, Available, Finished)

### Delta Table Creation and Upsert for Fact Reservations

This section handles the creation and upsert operations for the `factreservationsgold` Delta table:

1. **Delta Table Creation**:
   - Creates the `factreservationsgold` Delta table with columns for various booking and revenue metrics, including foreign keys (`GuestKey` and `Booking_ChannelKey`).

2. **Data Preparation**:
   - Loads and joins the dimension tables (`dimbookingchannelgold` and `dimguestgold`) to enrich the `df` DataFrame with dimension keys (`Booking_ChannelKey` and `GuestKey`).
   - The resulting DataFrame `dffactReservation_gold` is prepared by selecting and renaming columns, followed by sorting the data.

3. **Upsert Operation**:
   - Reads the existing Delta table (`factreservationsgold`) into `deltaTable`.
   - Performs a merge operation where:
     - **Matched Records**: Existing records are updated if necessary (currently no update operations are specified).
     - **Non-Matched Records**: New records are inserted with the updated values from the DataFrame `dffactReservation_gold`.


In [17]:
from pyspark.sql.types import *
from delta.tables import *
    
DeltaTable.createIfNotExists(spark) \
    .tableName("Hotel.factreservationsgold") \
    .addColumn("GuestKey", LongType()) \
    .addColumn("Booking_ChannelKey", LongType()) \
    .addColumn("Date", DateType()) \
    .addColumn("Marketing_Spend", FloatType()) \
    .addColumn("Revenue", FloatType()) \
    .addColumn("Room_Revenue", FloatType()) \
    .addColumn("Occupancy_Rate", FloatType()) \
    .addColumn("ADR", FloatType()) \
    .addColumn("RevPAR", FloatType()) \
    .addColumn("Available_Rooms", IntegerType()) \
    .addColumn("Reserved_Rooms", IntegerType()) \
    .addColumn("Complaints", IntegerType()) \
    .addColumn("Compliment", IntegerType()) \
    .addColumn("Bookings", IntegerType()) \
    .addColumn("No_Shows", IntegerType()) \
    .addColumn("Cancellations", IntegerType()) \
    .addColumn("Market_Segment", FloatType()) \
    .addColumn("Checkouts", IntegerType()) \
    .addColumn("New_Bookings", IntegerType()) \
    .addColumn("Checkins", IntegerType()) \
    .addColumn("Average_Review_Score", FloatType()) \
    .addColumn("Revenue_Managed_Guests", FloatType()) \
    .addColumn("RevPAR_Managed_Guests", FloatType()) \
    .addColumn("Occupancy_Managed_Guests", FloatType()) \
    .addColumn("RevPAR_All", FloatType()) \
    .addColumn("Occupancy_All", FloatType()) \
    .addColumn("Room_Revenue_All", FloatType()) \
    .addColumn("Total_Revenue", FloatType()) \
    .addColumn("Operating_Expenses", FloatType()) \
    .addColumn("Fixed_Costs", FloatType()) \
    .addColumn("Variable_Costs", FloatType()) \
    .addColumn("Total_Costs", FloatType()) \
    .addColumn("Profit", FloatType()) \
    .execute()

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 19, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7e3b30be40d0>

In [18]:
from pyspark.sql.functions import col
    
dfdimBookingChannel_temp = spark.read.table("Hotel.dimbookingchannelgold")
dfdimGuestCountry_temp = spark.read.table("Hotel.dimguestgold")
        
# Create Sales_gold dataframe
    
dffactReservation_gold = df.alias("df1").join(dfdimBookingChannel_temp.alias("df2"),(df.Booking_Channel == dfdimBookingChannel_temp.Booking_Channel), "left") \
        .join(dfdimGuestCountry_temp.alias("df3"),(df.Guest_Type == dfdimGuestCountry_temp.Guest_Type) & (df.Guest_Country == dfdimGuestCountry_temp.Guest_Country), "left") \
    .select(col("df2.Booking_ChannelKey") \
        , col("df3.GuestKey") \
        , col("df1.Date") \
        , col("df1.Marketing_Spend") \
        , col("df1.Revenue") \
        , col("df1.Room_Revenue") \
        , col("df1.Occupancy_Rate") \
        , col("df1.ADR") \
        , col("df1.RevPAR") \
        , col("df1.Available_Rooms") \
        , col("df1.Reserved_Rooms") \
        , col("df1.Complaints") \
        , col("df1.Compliment") \
        , col("df1.Bookings") \
        , col("df1.No_Shows") \
        , col("df1.Cancellations") \
        , col("df1.Market_Segment") \
        , col("df1.Checkouts") \
        , col("df1.New_Bookings") \
        , col("df1.Checkins") \
        , col("df1.Average_Review_Score") \
        , col("df1.Revenue_Managed_Guests") \
        , col("df1.RevPAR_Managed_Guests") \
        , col("df1.Occupancy_Managed_Guests") \
        , col("df1.RevPAR_All") \
        , col("df1.Occupancy_All") \
        , col("df1.Room_Revenue_All") \
        , col("df1.Total_Revenue") \
        , col("df1.Operating_Expenses") \
        , col("df1.Fixed_Costs") \
        , col("df1.Variable_Costs") \
        , col("df1.Total_Costs") \
        , col("df1.Profit")
    ).orderBy(col("df1.Date"), col("df2.Booking_ChannelKey"), col("df3.GuestKey"))
    
# Display the first 10 rows of the dataframe to preview your data
    
display(dffactReservation_gold.head(10))

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5ddcc0cf-505d-4b44-b719-19b103a7424c)

In [19]:
dffactReservation_temp = spark.read.table("Hotel.factreservationsgold")

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 21, Finished, Available, Finished)

In [20]:
from delta.tables import *
    
deltaTable = DeltaTable.forPath(spark, 'abfss://edcd96a3-3877-4553-b100-54aeef9c4401@onelake.dfs.fabric.microsoft.com/3f561343-f018-4ab6-a09f-74d0dd933efb/Tables/factreservationsgold')
    
dfUpdates = dffactReservation_gold
    
deltaTable.alias('silver') \
   .merge(
     dfUpdates.alias('updates'),
     'silver.Booking_ChannelKey = updates.Booking_ChannelKey AND silver.GuestKey = updates.GuestKey'
   ) \
    .whenMatchedUpdate(set =
     {
          
     }
   ) \
  .whenNotMatchedInsert(values =
     {
       "Date": "updates.Date",
       "Booking_ChannelKey": "updates.Booking_ChannelKey",
       "GuestKey": "updates.GuestKey",
       "Marketing_Spend": "updates.Marketing_Spend",
       "Revenue": "updates.Revenue",
       "Room_Revenue": "updates.Room_Revenue",
       "Occupancy_Rate": "updates.Occupancy_Rate",
       "ADR": "updates.ADR",
       "RevPAR": "updates.RevPAR",
       "Available_Rooms": "updates.Available_Rooms",
       "Reserved_Rooms": "updates.Reserved_Rooms",
       "Complaints": "updates.Complaints",
       "Compliment": "updates.Compliment",
       "Bookings": "updates.Bookings",
       "No_Shows": "updates.No_Shows",
       "Cancellations": "updates.Cancellations",
       "Market_Segment": "updates.Market_Segment",
       "Checkouts": "updates.Checkouts",
       "New_Bookings": "updates.New_Bookings",
       "Checkins": "updates.Checkins",
       "Average_Review_Score": "updates.Average_Review_Score",
       "Revenue_Managed_Guests": "updates.Revenue_Managed_Guests",
       "RevPAR_Managed_Guests": "updates.RevPAR_Managed_Guests",
       "Occupancy_Managed_Guests": "updates.Occupancy_Managed_Guests",
       "RevPAR_All": "updates.RevPAR_All",
       "Occupancy_All": "updates.Occupancy_All",
       "Room_Revenue_All": "updates.Room_Revenue_All",
       "Total_Revenue": "updates.Total_Revenue",
       "Operating_Expenses": "updates.Operating_Expenses",
       "Fixed_Costs": "updates.Fixed_Costs",
       "Variable_Costs": "updates.Variable_Costs",
       "Total_Costs": "updates.Total_Costs",
       "Profit": "updates.Profit"
     }
   ) \
   .execute()   

StatementMeta(, de5fd1f5-e0a8-4eb7-bb20-f889b47460b3, 22, Finished, Available, Finished)