In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.functions as F
import os

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Hotel Bookings EDA") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.driver.host", "localhost") \
    .config("spark.ui.enabled", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("\nLoading datasets...")

# Load the two CSV files
df1 = spark.read.csv("hotel-booking.csv", header=True, inferSchema=True)
df2 = spark.read.csv("customer-reservations.csv", header=True, inferSchema=True)

25/10/07 18:46:19 WARN Utils: Your hostname, Nickos-Mac.local resolves to a loopback address: 127.0.0.1; using 2600:6c88:9b40:54:0:0:0:12f2 instead (on interface en0)
25/10/07 18:46:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/07 18:46:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



Loading datasets...


In [2]:
print(f"Hotel Booking loaded: {df1.count()} rows")
print(f"Customer Reservations loaded: {df2.count()} rows")
print("\n--- Hotel Booking Schema ---")
df1.printSchema()
print("\n--- Customer Reservations Schema ---")
df2.printSchema()
# Compare column names
cols1 = set(df1.columns)
cols2 = set(df2.columns)

print("\n--- Column Comparison ---")
print(f"Hotel Booking has {len(cols1)} columns")
print(f"Customer Reservations has {len(cols2)} columns")
print(f"Overlapping columns: {len(cols1.intersection(cols2))}")
# FIND UNIQUE COLUMNS
unique_to_1 = cols1 - cols2
unique_to_2 = cols2 - cols1

if unique_to_1:
    print(f"\nUnique to Hotel Booking ({len(unique_to_1)} columns):")
    for column in sorted(unique_to_1):
        print(f"  - {column}")

if unique_to_2:
    print(f"\nUnique to Customer Reservations ({len(unique_to_2)} columns):")
    for column in sorted(unique_to_2):
        print(f"  - {column}")

Hotel Booking loaded: 78703 rows
Customer Reservations loaded: 36275 rows

--- Hotel Booking Schema ---
root
 |-- hotel: string (nullable = true)
 |-- booking_status: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: string (nullable = true)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- market_segment_type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- avg_price_per_room: double (nullable = true)
 |-- email: string (nullable = true)


--- Customer Reservations Schema ---
root
 |-- Booking_ID: string (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_year: integer (nullable 

In [3]:
# NULL VALUE ANALYSIS
print("\nNull Value Analysis...")

def analyze_nulls(df, name):
    """Analyze null values in the dataset"""
    print(f"\n--- {name} Null Analysis ---")
    
    # Calculate null counts for all columns
    null_counts = []
    for c in df.columns:
        null_count = df.filter(col(c).isNull()).count()
        null_counts.append((c, null_count))
    
    # Filter and sort columns with nulls
    columns_with_nulls = [(c, n) for c, n in null_counts if n > 0]
    columns_with_nulls.sort(key=lambda x: x[1], reverse=True)
    
    total_rows = df.count()
    
    if columns_with_nulls:
        print(f"Columns with null values: {len(columns_with_nulls)}/{len(df.columns)}")
        print(f"\n{'Column':<30} {'Null Count':>12} {'Percentage':>12}")
        print("-" * 56)
        for col_name, null_count in columns_with_nulls:
            pct = (null_count / total_rows) * 100
            print(f"{col_name:<30} {null_count:>12,} {pct:>11.2f}%")
    else:
        print("No null values found!")

analyze_nulls(df1, "Hotel Booking")
analyze_nulls(df2, "Customer Reservations")


Null Value Analysis...

--- Hotel Booking Null Analysis ---
Columns with null values: 1/13

Column                           Null Count   Percentage
--------------------------------------------------------
country                                 405        0.51%

--- Customer Reservations Null Analysis ---
No null values found!


In [4]:
# DISTINCT VALUE ANALYSIS
print("\nDistinct Value Analysis...")

def analyze_distinct_values(df, name):
    """Analyze distinct values for all columns"""
    print(f"\n--- {name} Distinct Values ---")
    print(f"\n{'Column':<30} {'Distinct Values':>18} {'Uniqueness %':>15}")
    print("-" * 65)
    
    total_rows = df.count()
    
    for c in df.columns:
        distinct_count = df.select(c).distinct().count()
        uniqueness = (distinct_count / total_rows) * 100
        print(f"{c:<30} {distinct_count:>18,} {uniqueness:>14.2f}%")

analyze_distinct_values(df1, "Hotel Booking")
analyze_distinct_values(df2, "Customer Reservations")


Distinct Value Analysis...

--- Hotel Booking Distinct Values ---

Column                            Distinct Values    Uniqueness %
-----------------------------------------------------------------
hotel                                           2           0.00%
booking_status                                  2           0.00%
lead_time                                     439           0.56%
arrival_year                                    2           0.00%
arrival_month                                  12           0.02%
arrival_date_week_number                       53           0.07%
arrival_date_day_of_month                      31           0.04%
stays_in_weekend_nights                        17           0.02%
stays_in_week_nights                           32           0.04%
market_segment_type                             8           0.01%
country                                       160           0.20%
avg_price_per_room                          6,985           8.88%
email   

In [5]:
# DATA TYPE ANALYSIS
print("\nData Type Analysis...")

def analyze_data_types(df, name):
    """Show data types distribution"""
    print(f"\n--- {name} Data Types ---")
    
    type_counts = {}
    for col_name, col_type in df.dtypes:
        type_counts[col_type] = type_counts.get(col_type, 0) + 1
    
    print(f"\n{'Data Type':<20} {'Count':>10}")
    print("-" * 32)
    for dtype, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True):
        print(f"{dtype:<20} {count:>10}")

analyze_data_types(df1, "Hotel Bookings")
analyze_data_types(df2, "Customer Reservations")



Data Type Analysis...

--- Hotel Bookings Data Types ---

Data Type                 Count
--------------------------------
int                           7
string                        5
double                        1

--- Customer Reservations Data Types ---

Data Type                 Count
--------------------------------
int                           6
string                        3
double                        1


In [8]:
# DATA TYPE MISMATCHES
print("\nChecking for Data Type Mismatches...")

def compare_column_types(df1, df2):
    """Compare data types of common columns"""
    common_cols = sorted(set(df1.columns).intersection(set(df2.columns)))
        
    df1_types = dict(df1.dtypes)
    df2_types = dict(df2.dtypes)
    
    mismatches = []
    matches = []
    
    for col_name in common_cols:
        type1 = df1_types[col_name]
        type2 = df2_types[col_name]
        
        if type1 != type2:
            mismatches.append((col_name, type1, type2))
        else:
            matches.append(col_name)
    return mismatches

common_cols = sorted(set(df1.columns).intersection(set(df2.columns)))

print(f"\nComparing {len(common_cols)} common columns...")
type_mismatches = compare_column_types(df1, df2)

if type_mismatches:
    print(f"\nFound {len(type_mismatches)} type mismatches:")
    print(f"\n{'Column':<30} {'Hotel Bookings Type':<20} {'Customer Reservations Type':<20}")
    print("-" * 72)
    
    for col_name, type1, type2 in type_mismatches:
        print(f"{col_name:<30} {type1:<20} {type2:<20}")
else:
    print("\nAll common columns have matching types!")


Checking for Data Type Mismatches...

Comparing 8 common columns...

Found 2 type mismatches:

Column                         Hotel Bookings Type  Customer Reservations Type
------------------------------------------------------------------------
arrival_month                  string               int                 
booking_status                 int                  string              


## 1.3 Data Processing

Matching datatypes for overlapping columns.

In [None]:
# Standardize column types
for col_name, type1, type2 in compare_column_types(df1, df2):
    # Cast df2’s column to df1’s type (or vice versa)
    df2 = df2.withColumn(col_name, df2[col_name].cast(type1))

mismatches = compare_column_types(df1,df2)

if mismatches:
    print(f' mismatches: {len(mismatches)}')
else:
    print(f'Types all match. {len(mismatches)} mismatches')


Types all match. 0 mismatches


In [None]:
print(f'{len(df1.columns)}')
print(f'{len(df2.columns)}')

print(len(df1.columns) + len(df2.columns))


13
10
23


Renaming overlapping columns, to avoid collisions.

In [18]:
overlapping_columns = sorted(set(df1.columns).intersection(set(df2.columns)))
print(overlapping_columns)

['arrival_month', 'arrival_year', 'avg_price_per_room', 'booking_status', 'lead_time', 'market_segment_type', 'stays_in_week_nights', 'stays_in_weekend_nights']


In [20]:
hotel_columns = df1.columns
customer_columns = df2.columns

for c in overlapping_columns:
    df1 = df1.withColumnRenamed(c, "hotel_"+c)
    df2 = df2.withColumnRenamed(c, "customer"+c)

overlapping_columns = sorted(set(df1.columns).intersection(set(df2.columns)))
print(overlapping_columns)

[]


In [21]:
def union_fill_na(df1, df2):
    # Get all columns
    all_cols = set(df1.columns) | set(df2.columns)
    
    # Add missing columns to each DataFrame
    for col in all_cols - set(df1.columns):
        df1 = df1.withColumn(col, F.lit(None))
    for col in all_cols - set(df2.columns):
        df2 = df2.withColumn(col, F.lit(None))
    
    # Reorder columns to match
    df1 = df1.select(*all_cols)
    df2 = df2.select(*all_cols)
    
    return df1.unionByName(df2)

merged_df = union_fill_na(df1, df2)

In [22]:
merged_df

DataFrame[customerbooking_status: int, customerarrival_month: string, hotel: string, customerstays_in_weekend_nights: int, hotel_arrival_year: int, arrival_date_day_of_month: int, customerlead_time: int, customerstays_in_week_nights: int, arrival_date_week_number: int, hotel_market_segment_type: string, hotel_booking_status: int, hotel_avg_price_per_room: double, customerarrival_year: int, hotel_stays_in_weekend_nights: int, hotel_lead_time: int, hotel_stays_in_week_nights: int, email: string, Booking_ID: string, hotel_arrival_month: string, country: string, arrival_date: int, customermarket_segment_type: string, customeravg_price_per_room: double]