In [1]:
from pyspark.sql import SparkSession, functions as fn
import merging
import importlib
importlib.reload(merging)

spark = (SparkSession.builder
         .config("spark.sql.ansi.enabled", "false") # Must be disabled because there is an invalid date.
         .getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/03 16:17:07 WARN Utils: Your hostname, MacBook-Air-123.local, resolves to a loopback address: 127.0.0.1; using 10.13.65.202 instead (on interface en0)
25/10/03 16:17:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/03 16:17:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Leading Data

In [2]:
customer_df = spark.read.csv("../data/customer-reservations.csv", header=True,inferSchema=True)
hotel_df = spark.read.csv("../data/hotel-booking.csv", header=True,inferSchema=True)

In [3]:
customer_df.show(3)

+----------+-----------------------+--------------------+---------+------------+-------------+------------+-------------------+------------------+--------------+
|Booking_ID|stays_in_weekend_nights|stays_in_week_nights|lead_time|arrival_year|arrival_month|arrival_date|market_segment_type|avg_price_per_room|booking_status|
+----------+-----------------------+--------------------+---------+------------+-------------+------------+-------------------+------------------+--------------+
|  INN00001|                      1|                   2|      224|        2017|           10|           2|            Offline|              65.0|  Not_Canceled|
|  INN00002|                      2|                   3|        5|        2018|           11|           6|             Online|            106.68|  Not_Canceled|
|  INN00003|                      2|                   1|        1|        2018|            2|          28|             Online|              60.0|      Canceled|
+----------+----------------

In [4]:
hotel_df.show(3)

+------------+--------------+---------+------------+-------------+------------------------+-------------------------+-----------------------+--------------------+-------------------+-------+------------------+--------------------+
|       hotel|booking_status|lead_time|arrival_year|arrival_month|arrival_date_week_number|arrival_date_day_of_month|stays_in_weekend_nights|stays_in_week_nights|market_segment_type|country|avg_price_per_room|               email|
+------------+--------------+---------+------------+-------------+------------------------+-------------------------+-----------------------+--------------------+-------------------+-------+------------------+--------------------+
|Resort Hotel|             0|      342|        2015|         July|                      27|                        1|                      0|                   0|             Direct|    PRT|               0.0|Ernest.Barnes31@o...|
|Resort Hotel|             0|      737|        2015|         July|          

## Finding Commonalities

In [5]:
common_columns, common_types = merging.get_common_columns(customer_df, hotel_df)
print(list(zip(common_columns, common_types)))

[('avg_price_per_room', 'double'), ('lead_time', 'int'), ('market_segment_type', 'string'), ('arrival_year', 'int'), ('stays_in_weekend_nights', 'int'), ('stays_in_week_nights', 'int')]


We see that

```
['arrival_year', 'avg_price_per_room', 'stays_in_weekend_nights', 'lead_time', 'stays_in_week_nights', 'market_segment_type']
```

are shared between both datasets with the same data types and therefore could easily be present in a combined dataset. 




## Finding Differences

In [6]:
customer_unique_columns, cuc_types = merging.get_left_unique_columns(customer_df, hotel_df)
print(list(zip(customer_unique_columns, cuc_types)))

[('Booking_ID', 'string'), ('arrival_date', 'int'), ('booking_status', 'string'), ('arrival_month', 'int')]


In [7]:
hotel_unique_columns, huc_types = merging.get_left_unique_columns(hotel_df, customer_df)
print(list(zip(hotel_unique_columns, huc_types)))

[('arrival_date_day_of_month', 'int'), ('arrival_month', 'string'), ('hotel', 'string'), ('country', 'string'), ('arrival_date_week_number', 'int'), ('booking_status', 'int'), ('email', 'string')]


## Processing the Data

Before merging the two data sources into one, they must be normalized into the same schema.


### Columns that can be dropped
Of those unique to `customer_df`,
- `Booking_ID` can be dropped since it has no meaning for our purposes.

Of those unique to `hotel_df`,
- `arrival_date_week_number` can be dropped because it is redundant with the arrival month, date, and year.

In [8]:
customer_df2 = customer_df.drop("Booking_ID")
hotel_df2 = hotel_df.drop("arrival_date_week_number")

### Columns that need transformation before merger

#### Booking status
`booking_status`, as revealed from the EDA analysis, looks to be equivalent for both, save that the values are stored under different conventions as given by the following table:

| customer_df  | hotel_df |
| -----------  | -------- |
| Not_Canceled |     0    |
|  Canceled    |     1    |

I discerned this from how in `customer_df`, about one third of the reservations were canceled.
Likewise, about one third of the reservations in `hotel_df` have `1` and the other two thirds `0`,
leading me to assume the above mapping.

In the merged data, I will convert these into a column called `canceled`, where `0` is "not canceled" and `1` is "canceled".

In [9]:
customer_df2 = customer_df2.withColumn(
    "canceled",
    fn.when(fn.col("booking_status") == "Not_Canceled", 0)
    .when(fn.col("booking_status") == "Canceled", 1)).drop("booking_status")

hotel_df2 = hotel_df2.withColumnRenamed("booking_status", "canceled")

#### Arrival Time
The arrival time is present in both, but with some name and storage type differences.

- `arrival_date` in `customer_df` is `arrival_date_day_of_month` in `hotel_df`. 
- `arrival_month` is stored as an integer 1-12 in `customer_df` but as the full month's name in `hotel_df`.

In the merged data, arrival time will be encoded as a single column called `arrival_time` that will store the date of arrival as a `DateType` in PySpark. This will replace the current month, day, and year columns present in both dataframes.

In [10]:
customer_df2 = (customer_df2
 .withColumn("arrival_time",
             fn.make_date(
                 fn.col("arrival_year"),
                 fn.col("arrival_month"),
                 fn.col("arrival_date")))
 .drop("arrival_year")
 .drop("arrival_month")
 .drop("arrival_date"))


def map_mongth_name_to_number(column_name: str):
    month_name_to_number = {
        "February": 2,
        "March": 3,
        "April": 4,
        "May": 5,
        "June": 6,
        "July": 7,
        "August": 8,
        "September": 9,
        "October": 10,
        "November": 11,
        "December": 12
    }
    mapping = fn.when(fn.col(column_name) == "January", 1)
    for name, num in month_name_to_number.items():
        mapping = mapping.when(fn.col(column_name) == name, num)
    return mapping

hotel_df2 = (hotel_df2
 .withColumn("arrival_month",map_mongth_name_to_number("arrival_month"))
 .withColumn("arrival_time",
             fn.make_date(
                 fn.col("arrival_year"),
                 fn.col("arrival_month"),
                 fn.col("arrival_date_day_of_month")))
 .drop("arrival_year")
 .drop("arrival_month")
 .drop("arrival_date_day_of_month"))

### Filling in Missing Data

The `country`, `hotel`, and `email` are present in `hotel_df` but not `customer_df`.
These columns may be interesting for our analysis, so I will keep them in the merged data.
For the rows that come from `customer_df`, I will insert Null values for these three columns
in the merged data.

In [11]:
# Null values will automatically be inserted for the rows from customer_df when
# the two dataframes are merged

## Concerning New Null Values

The built-in data integrity protections of Spark prevented me from processing all of the above operations because it detected a date of February 29th, 2018, which is invalid because 2018 was not a leap year. To permit the operations form continuing, I disabled the data protections by setting `spark.sql.ansi.enabled` to false. After doing this, 37 Null values appeared in the processed version of `customer_df`, which is called `customer_df2`. The code below verifies that all 37 of these nulls result form rows that improperly have
February 29th, 2018 as the arrival date.

In [12]:
import eda
eda.print_num_null_per_column(customer_df2)

Column                     Number of Nulls    
------------------------------------------
stays_in_weekend_nights    0                  
stays_in_week_nights       0                  
lead_time                  0                  
market_segment_type        0                  
avg_price_per_room         0                  
canceled                   0                  
arrival_time               37                 


In [13]:
customer_df.where((fn.col("arrival_year") == 2018) & (fn.col("arrival_date") == 29) & (fn.col("arrival_month") == 2) ).count()

37

### Fixing the Data Integrity Issue

Since February 29th, 2018 does not exist, I will be fixing the data by replacing the arrival time for all such entries with March 1st, 2018, since that would be the correct day for that year.

In [14]:
customer_df3 = customer_df2.withColumn(
    "arrival_time",
    fn.coalesce(fn.col("arrival_time"), fn.lit("2018-03-01").cast("date"))
)

eda.print_num_null_per_column(customer_df3)

Column                     Number of Nulls    
------------------------------------------
stays_in_weekend_nights    0                  
stays_in_week_nights       0                  
lead_time                  0                  
market_segment_type        0                  
avg_price_per_room         0                  
canceled                   0                  
arrival_time               0                  


## Merging the Data

In [15]:
merged_data = hotel_df2.unionByName(customer_df3, allowMissingColumns=True)

merged_data.describe().show()
merged_data.printSchema()

25/10/03 16:17:18 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------+-------------------+-----------------+-----------------------+--------------------+-------------------+-------+------------------+--------------------+
|summary|       hotel|           canceled|        lead_time|stays_in_weekend_nights|stays_in_week_nights|market_segment_type|country|avg_price_per_room|               email|
+-------+------------+-------------------+-----------------+-----------------------+--------------------+-------------------+-------+------------------+--------------------+
|  count|       78703|             114978|           114978|                 114978|              114978|             114978|  78298|            114978|               78703|
|   mean|        NULL| 0.3510584633582077|96.22974829967472|     0.8745499138965716|   2.371088382125276|               NULL|   NULL| 97.80159865365225|                NULL|
| stddev|        NULL|0.47730325797274625|100.5264041802159|     0.9546293077588424|  1.7431996147326994|               NULL|   NU

                                                                                

### Reformatting the Merged Data

Currently, the number of days in a stay is kept in two columns, `stays_in_weekend_nights` and `stays_in_week_nights`.
These should be merged into a single column, `stays_in_nights`, which is the sum of the two.
This will not lead to a loss of information because whether a stay was on a weeknight or not can be deduced from the
arrival date in conjunction with the number of nights stayed.

In [21]:
merged_data2 = (merged_data
                .withColumn(
                    "stays_in_nights",
                    fn.col("stays_in_weekend_nights") + fn.col("stays_in_week_nights"))
                .drop("stays_in_weekend_nights")
                .drop("stays_in_week_nights"))

### Saving the Merged Data

In [22]:
import shutil
import tempfile
import os
from pathlib import Path

with tempfile.TemporaryDirectory() as tmpdir:
    tmpdir = Path(tmpdir) / "unified"
    merged_data2.repartition(1).write.csv(str(tmpdir), header=True)
    csv_file = tmpdir / next(filter(lambda p: p.suffix == ".csv", map(Path, os.listdir(tmpdir))))
    shutil.copy(csv_file, Path("../", "data", "unified.csv"))
    
