In [0]:
%pip install mlflow
%load_ext autoreload
%autoreload 2

In [0]:
dbutils.widgets.dropdown("env_stage", "dev", ["dev", "prod", "qa","dev_synxis_2_0", "prod_synxis_2_0", "qa_synxis_2_0"], "Pipeline stage")
dbutils.widgets.dropdown("source_catalog", "phg_data", ["dev_data", "qa_data","phg_data"], "Source Catalog")
dbutils.widgets.dropdown("exclude_pms", "False", ["True", "False"], "Exclude PMS")
dbutils.widgets.dropdown("target_type", "REVENUE", ["REVENUE", "ROOMS"], "Target Type")
dbutils.widgets.dropdown("is_usd_currency", "True", ["True", "False"], "Use USD currency")
dbutils.widgets.text("selected_hotels", "", "Hotels")
dbutils.widgets.text("lag_numbers","1,7,14,28", "Lag Numbers")
dbutils.widgets.text("model_start_date", "2018-10-01", "Model Start Date")

In [0]:
import pandas as pd
import numpy as np
from typing import List
import holidays
import matplotlib
import seaborn as sns
from datetime import datetime, timezone
from pathlib import Path
import pickle
import os
from sys import version_info
import cloudpickle
from autogluon.core.utils.loaders import load_pkl
import logging
import shutil
import mlflow
from mlflow import MlflowException
import mlflow.pyfunc
import time
import warnings
import pyspark.sql.functions as F
import sys

warnings.filterwarnings("ignore")
start_time = time.perf_counter()

In [0]:
from phgml.data.processing_distr_ca import filter_hotels, filter_test_partition
from phgml.data.processing_distr_spark import (
    filter_data,
    preprocess_data,
    get_lags,
    compile_hotel_tables,
)
from phgml.data.data_types import inference_output_schema
from phgml.reporting.logging import get_logging_path,get_logging_filename
from phgml.data.config import ForecastingHotelConfigProvider,EnvironmentConfig
from phgml.utilities.task_utilities import str_to_lst, str_to_bool

In [0]:
# Disable adaptive query optimization
# Adaptive query optimization groups together smaller tasks into a larger tasks.
# This may result in limited parallelism if the parallel inference tasks are deemed to be too small by the query optimizer
# We are diableing AQE here to circumevent this limitation on parallelism
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [0]:
params = {}
params["ENV"] = getArgument("env_stage")
params["SOURCE_CATALOG"] = getArgument("source_catalog")
params["REVENUE_COL"] = "_reservationRevenuePerRoomUSD"
params["ROOMS_COL"] = "_rooms"
params["PIPELINE"] = "INFERENCE"
params["WITHOUT_PMS"] = str_to_bool(getArgument("exclude_pms"))
params["IS_USD_CURRENCY"] = str_to_bool(getArgument("is_usd_currency"))
params["TARGET_TYPE"] = getArgument("target_type")
params["SELECTED_HOTELS"] = str_to_lst(getArgument("selected_hotels"))
params["LAG_NUMBERS"] = list(map(int,str_to_lst(getArgument("lag_numbers"))))
params["COVID_START_DATE"] = pd.to_datetime("2020-03-01")
params["COVID_END_DATE"] = pd.to_datetime("2021-08-01")
params["MODEL_START_DATE"] = pd.to_datetime(getArgument("model_start_date"))
params["CALC_UNCERTAINTY"] = False
params['LOG_ROOT'] = '/dbfs/mnt/extractionlogs/synxis'

cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName") 

if "synxis_2_0" in params["ENV"]:
    params['LOG_ROOT'] = '/dbfs/mnt/extractionlogs/synxis_2_0'

In [0]:
env_config = EnvironmentConfig(env=params["ENV"], target=params["TARGET_TYPE"], spark=spark, is_usd_currency=params["IS_USD_CURRENCY"])
forecasting_config_provider = ForecastingHotelConfigProvider(spark=spark,env=params["ENV"])
params["TARGET_COLUMN"] = env_config.target_column

for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
        
logging.root.setLevel(logging.INFO)

processing_timestamp = datetime.now()

logfile_path = get_logging_path(params['LOG_ROOT'],processing_timestamp)
if not os.path.exists(logfile_path):
    os.makedirs(logfile_path)

pms = "PMS"
if params['WITHOUT_PMS']:
    pms = "NOPMS"
        
log_file_name = get_logging_filename(
    logfile_path,
    "PREPROCESS",
    params['TARGET_TYPE'],
    pms,
    processing_timestamp)

logger = logging.getLogger(f"preprocess-{params['TARGET_TYPE']}-{pms}")

file_handler = logging.FileHandler(log_file_name)
file_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_format)

logger.addHandler(file_handler)

In [0]:
# As a workaround for the bug PHG-2157
params["PARTITION_DATE"] = spark.sql(
    f"select max(confirmationDate) from {env_config.source_data_table}"
).collect()[0][0]

print(f"Partition date: {params['PARTITION_DATE']}")

In [0]:
max_inference_length = spark.sql(f'select max(inference_prediction_length) from {forecasting_config_provider.config_table_name}').collect()[0][0]
max_lead_window = spark.sql(f'select max(lead_window) from {forecasting_config_provider.config_table_name}').collect()[0][0]
params["TEST_PARTIITON_END"] = params["PARTITION_DATE"] + pd.Timedelta(max_inference_length, "D")

print(f"Partition end date: {params['TEST_PARTIITON_END']}")

In [0]:
print(f"Executing pipeline stage: {params['ENV']}")
print(f"Processing data for target type: {params['TARGET_TYPE']} : {params['TARGET_COLUMN']}")
print(f"Intermediate inference results table name: {env_config.inference_intermediate_table }")
print(f"Writing inference results to table: {env_config.inference_output_table } with blob {env_config.inference_output_table_blob}")
print(f"Excluding PMS data? {params['WITHOUT_PMS']}")

logger.info(f"Executing pipeline stage: {params['ENV']}")
logger.info(f"Processing data for target type: {params['TARGET_TYPE']} : {params['TARGET_COLUMN']}")
logger.info(f"Intermediate inference results table name: {env_config.inference_intermediate_table }")
logger.info(f"Writing inference results to table: {env_config.inference_output_table } with blob {env_config.inference_output_table_blob}")
logger.info(f"Excluding PMS data? {params['WITHOUT_PMS']}")

In [0]:
logger.info("Selecting hotels.")

hotel_details = spark.sql(
    f"select HotelID,HotelName,PMSStartDate,Country,State from {params['SOURCE_CATALOG']}.dim_hotels_data"
).toPandas()

# Not considering state info other countries other than US and Canada for date features
hotel_details.loc[~hotel_details.Country.isin(['US','CA']), "State"] = "N/A"
hotel_details = hotel_details[~hotel_details.HotelID.isna()]

#Filter hotels 
correct_hotel_ids = filter_hotels(
    hotel_details,
    params["SELECTED_HOTELS"],
    params["WITHOUT_PMS"],
    forecasting_config_provider
)

In [0]:
logger.info("Loading data")
# Select transaction data along with the cancellation data from the raw dataset
if "synxis_2_0" in params['ENV']:
    dfsp_src = spark.sql(
        f"select * from {env_config.source_data_table}"
        )
    
    dfsp_src = dfsp_src.withColumn(
        'cancellationDate',
        F.when(F.col('status') == 'No-show', F.col('_StayDates')).otherwise(F.col('cancellationDate'))
    )
else:
    dfsp_src = spark.sql(
        f"select a.TransactionID,a.HotelID,a._StayDates,a.confirmationDate,a.departureDate,a.channel,a.status,a.cancellationNumber,a._reservationRevenuePerRoomUSD,a._rooms,b.cancellationDate from {env_config.source_data_table} as a left join {env_config.transaction_data_table} as b on a.TransactionID=b.TransactionID"
        )

dfsp_src = dfsp_src.filter(dfsp_src.HotelID.isin(correct_hotel_ids))

In [0]:
# load booking statuses 
result = spark.sql("SELECT * FROM phg_data.bookings_status")

confirmed_status_list = [row['status'] for row in result.filter(result.scenario == 'confirmed').collect()]
cancelled_status_list = [row['status'] for row in result.filter(result.scenario == 'cancelled').collect()]

# Display the list
print(f"Confirmed Booking Status List: {confirmed_status_list}")
print(f"Cancelled Booking Status List: {cancelled_status_list}")

In [0]:
columns = [
    "HotelID",
    "_StayDates",
    "confirmationDate",
    "channel",
    "status",
    params["REVENUE_COL"],
    params["ROOMS_COL"],
]

dfsp = dfsp_src.filter(
        (F.col('status').isin(confirmed_status_list)) & (dfsp_src.cancellationDate.isNull())
    ).select(columns)

In [0]:
logger.info("Preprocessing data")
df = preprocess_data(
    dfsp,
    params["WITHOUT_PMS"],
    params["REVENUE_COL"],
    params["ROOMS_COL"],
    params["MODEL_START_DATE"],
    cancel_aware=False
)

logger.info("Calculating date features")
partition_start_date = params["PARTITION_DATE"]
partition_end_date = params["TEST_PARTIITON_END"]
dates = spark.sql(f"select * from phg_data.date_features where date >= '{partition_start_date}' and date <= '{partition_end_date}'")
dates = dates.withColumn('date', F.to_date('date'))
dates = dates.withColumnRenamed("date","_StayDates")
dates = dates.join(
    spark.createDataFrame(hotel_details[["HotelID","Country","State"]]), 
    on=['Country','State'],
     how="inner")

logger.info("Calculating lag features")
df_lags = get_lags(
    df,
    lag_numbers=params["LAG_NUMBERS"], 
    target_col=params["TARGET_COLUMN"]
)

logger.info("Filtering test partition")
df = filter_test_partition(
    df=df,
    partition_start=params["PARTITION_DATE"],
    partition_end=params["TEST_PARTIITON_END"],
    revenue_col=params["REVENUE_COL"],
    rooms_col=params["ROOMS_COL"],
    cancel_aware=False
).orderBy(["HotelID", "_StayDates","confirmationDate"])

In [0]:
logger.info("Compiling test data set")
output_df = compile_hotel_tables(
    df=df,
    target_type=params["TARGET_TYPE"],
    target_column=params["TARGET_COLUMN"],
    prediction_horizon=max_inference_length,
    lead_window=max_lead_window,
    selected_hotels=correct_hotel_ids,
    dates=dates,
    df_lags=df_lags,
    spark=spark,
    cancel_aware=False,
    suffix=""
)

In [0]:
logger.info(
    f"Writing preprocess data to temporary table {env_config.preprocess_intermediate_table}"
)
(
    output_df.write.mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(env_config.preprocess_intermediate_table)
)

In [0]:
elapsed_time = time.perf_counter() - start_time
logger.info(f"Time elapsed {elapsed_time}")
logger.info(f"Time elapsed in minutes {elapsed_time/60}")
print(f"Time elapsed in minutes {elapsed_time/60}")
logger.info("Preprocessing completed.")