In [1]:
import os
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, round
import xgboost as xgb
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import LabelEncoder

In [2]:
class Config:
    TRAIN_PATH = '../data/train.parquet'
    TEST_PATH = '../data/test.parquet'
    LISTINGS_PATH = '../data/listings.parquet'
    MODEL_PATH = '../data/xgboost_model_baseline'

    # XGBoost Parameters
    N_ESTIMATORS = 100
    MAX_DEPTH = 6
    LEARNING_RATE = 0.1
    SUBSAMPLE = 0.8
    COL_SAMPLE_BY_TREE = 0.8
    RANDOM_STATE = 42


config = Config()

In [3]:
# Initialize Spark
spark = SparkSession.builder \
    .appName("AirbnbXGBoost_Baseline") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

print(f"Spark Session created. Version: {spark.version}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/27 14:44:36 WARN Utils: Your hostname, nnnnnn.local, resolves to a loopback address: 127.0.0.1; using 192.168.70.243 instead (on interface en0)
25/11/27 14:44:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/27 14:44:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/27 14:44:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/27 14:44:37 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/11/27 14:44:37 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


Spark Session created. Version: 4.0.1


In [4]:
print("Loading data...")

if not os.path.exists(config.TRAIN_PATH) or not os.path.exists(config.TEST_PATH):
    raise FileNotFoundError("Train/Test data not found. Please run the previous data prep step first.")

if not os.path.exists(config.LISTINGS_PATH):
    raise FileNotFoundError("Listings data not found. Please run the previous data prep step first.")

# Load Spark DataFrames
train_spark = spark.read.parquet(config.TRAIN_PATH)
test_spark = spark.read.parquet(config.TEST_PATH)
listings_spark = spark.read.parquet(config.LISTINGS_PATH)

# Cache for faster iteration
train_spark.cache()
test_spark.cache()
listings_spark.cache()

print(f"Train count: {train_spark.count():,}")
print(f"Test count:  {test_spark.count():,}")
print(f"Listings count: {listings_spark.count():,}")

Loading data...


                                                                                

Train count: 50,410
Test count:  12,603
Listings count: 12,004


In [5]:
# Feature Engineering
print("\nCreating features for XGBoost...")

# 1. User features from training data
print("  - Computing user statistics...")
user_stats = train_spark.groupBy("user_id").agg(
    avg("rating").alias("user_avg_rating"),
    count("item_id").alias("user_review_count")
).withColumnRenamed("user_id", "user_id_stats")

# 2. Item features from training data
print("  - Computing item statistics...")
item_stats = train_spark.groupBy("item_id").agg(
    avg("rating").alias("item_avg_rating"),
    count("user_id").alias("item_review_count")
).withColumnRenamed("item_id", "item_id_stats")

# 3. Join user and item stats to train data
print("  - Joining features to train data...")
train_with_features = train_spark \
    .join(user_stats, train_spark.user_id == user_stats.user_id_stats, "left") \
    .join(item_stats, train_spark.item_id == item_stats.item_id_stats, "left") \
    .drop("user_id_stats", "item_id_stats")

# 4. Join listings features
print("  - Joining listings metadata...")
# Map listing_id to item_id if needed, or join directly if listing_id exists
train_with_features = train_with_features \
    .join(listings_spark, train_with_features.listing_id == listings_spark.listing_id, "left")

# 5. Apply same feature engineering to test data
print("  - Applying features to test data...")
test_with_features = test_spark \
    .join(user_stats, test_spark.user_id == user_stats.user_id_stats, "left") \
    .join(item_stats, test_spark.item_id == item_stats.item_id_stats, "left") \
    .drop("user_id_stats", "item_id_stats") \
    .join(listings_spark, test_spark.listing_id == listings_spark.listing_id, "left")

print("✓ Feature engineering complete")


Creating features for XGBoost...
  - Computing user statistics...
  - Computing item statistics...
  - Joining features to train data...
  - Joining listings metadata...
  - Applying features to test data...
✓ Feature engineering complete


In [6]:
# Prepare data for XGBoost
print("\nPreparing data for XGBoost...")

# Convert to Pandas for XGBoost
print("  - Converting to Pandas DataFrames...")
train_df = train_with_features.toPandas()
test_df = test_with_features.toPandas()

print(f"  - Train shape: {train_df.shape}")
print(f"  - Test shape: {test_df.shape}")

# Select feature columns
feature_cols = [
    'user_id', 'item_id',
    'user_avg_rating', 'user_review_count',
    'item_avg_rating', 'item_review_count',
    'price', 'accommodates', 'bedrooms', 'beds',
    'minimum_nights', 'number_of_reviews',
    'review_scores_rating', 'review_scores_location', 'review_scores_value',
    'latitude', 'longitude'
]

# Handle categorical columns
categorical_cols = ['property_type', 'room_type', 'neighbourhood_cleansed']
label_encoders = {}

for col_name in categorical_cols:
    if col_name in train_df.columns:
        le = LabelEncoder()
        # Combine train and test for encoding
        combined = pd.concat([train_df[col_name].fillna('unknown'), 
                              test_df[col_name].fillna('unknown')])
        le.fit(combined)
        train_df[f'{col_name}_encoded'] = le.transform(train_df[col_name].fillna('unknown'))
        test_df[f'{col_name}_encoded'] = le.transform(test_df[col_name].fillna('unknown'))
        label_encoders[col_name] = le
        feature_cols.append(f'{col_name}_encoded')

# Handle boolean columns
bool_cols = ['host_is_superhost', 'instant_bookable']
for col_name in bool_cols:
    if col_name in train_df.columns:
        train_df[col_name] = train_df[col_name].astype(float).fillna(0)
        test_df[col_name] = test_df[col_name].astype(float).fillna(0)
        feature_cols.append(col_name)

# Select only available features
available_features = [f for f in feature_cols if f in train_df.columns]
print(f"  - Using {len(available_features)} features")

# Prepare X and y
X_train = train_df[available_features].fillna(0)
y_train = train_df['rating'].values
X_test = test_df[available_features].fillna(0)
y_test = test_df['rating'].values

print("✓ Data prepared for XGBoost")


Preparing data for XGBoost...
  - Converting to Pandas DataFrames...
  - Train shape: (50410, 29)
  - Test shape: (12603, 29)
  - Using 22 features
✓ Data prepared for XGBoost


In [7]:
# Train XGBoost Model
print("\nTraining XGBoost model...")

model = xgb.XGBRegressor(
    n_estimators=config.N_ESTIMATORS,
    max_depth=config.MAX_DEPTH,
    learning_rate=config.LEARNING_RATE,
    subsample=config.SUBSAMPLE,
    colsample_bytree=config.COL_SAMPLE_BY_TREE,
    random_state=config.RANDOM_STATE,
    objective='reg:squarederror',
    n_jobs=-1
)

model.fit(X_train, y_train)

print("✓ Model trained successfully")


Training XGBoost model...
✓ Model trained successfully


In [8]:
# Generate Predictions
print("\nGenerating predictions on test set...")
y_pred = model.predict(X_test)

# Clip predictions to valid rating range [1, 5]
y_pred = np.clip(y_pred, 1.0, 5.0)

# Create predictions DataFrame for display
predictions_df = pd.DataFrame({
    'user_id': test_df['user_id'].values,
    'item_id': test_df['item_id'].values,
    'rating': y_test,
    'prediction': y_pred
})

print("Sample Predictions:")
print(predictions_df.head(10).to_string(index=False))


Generating predictions on test set...
Sample Predictions:
 user_id  item_id  rating  prediction
   13497       22    4.49    4.331111
    7614     6588    4.62    4.790527
      29     2317    3.68    4.104735
   13131     1166    1.36    3.069667
   14631        9    4.40    2.786867
    4731     5110    3.90    4.362851
     825     5346    3.29    3.406664
   14301      618    4.39    2.658826
    2732     4448    3.05    3.136234
    3539      840    3.65    4.348840


In [9]:
# Calculate RMSE
print("\nCalculating RMSE...")

rmse = np.sqrt(mean_squared_error(y_test, y_pred))

print("------------------------------------------------")
print(f"Root Mean Square Error (RMSE): {rmse:.4f}")
print("------------------------------------------------")

# Contextual Interpretation
print(f"\nInterpretation:")
print(f"On average, the model's prediction is off by {rmse:.2f} stars.")
print(f"For a 5-star scale, an RMSE below 1.0 is generally considered acceptable for a baseline.")

# Feature Importance
print("\nTop 10 Most Important Features:")
feature_importance = pd.DataFrame({
    'feature': available_features,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False)

print(feature_importance.head(10).to_string(index=False))



Calculating RMSE...
------------------------------------------------
Root Mean Square Error (RMSE): 1.0269
------------------------------------------------

Interpretation:
On average, the model's prediction is off by 1.03 stars.
For a 5-star scale, an RMSE below 1.0 is generally considered acceptable for a baseline.

Top 10 Most Important Features:
          feature  importance
  user_avg_rating    0.578336
  item_avg_rating    0.134606
   minimum_nights    0.088266
host_is_superhost    0.027515
room_type_encoded    0.023975
 instant_bookable    0.023321
             beds    0.015884
item_review_count    0.015334
            price    0.011463
          item_id    0.010985


In [10]:
# Save the model for future use
print(f"\nSaving model to {config.MODEL_PATH}...")
os.makedirs(config.MODEL_PATH, exist_ok=True)
model.save_model(f"{config.MODEL_PATH}/xgboost_model.json")

# Also save feature names and label encoders info
import json
model_info = {
    'feature_names': available_features,
    'categorical_columns': list(label_encoders.keys()),
    'rmse': float(rmse)
}

with open(f"{config.MODEL_PATH}/model_info.json", 'w') as f:
    json.dump(model_info, f, indent=2)

print("✓ Model saved successfully")



Saving model to ../data/xgboost_model_baseline...
✓ Model saved successfully
