In [5]:
# -------------------------------------------
# Airbnb London Price Prediction
# Data Exploration + Model Training
# -------------------------------------------

# 1. Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,regexp_replace, when, length
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# 2. Create Spark Session
spark = SparkSession.builder \
    .appName("Airbnb London Price Prediction") \
    .getOrCreate()

# 3. Load Dataset
file_path = "listingss.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# 4. Data Cleaning
df_clean = df.dropna(subset=["price", "latitude", "longitude", "room_type", "availability_365"])
df_clean = df_clean.filter(col("price") > 0)
df_clean = df_clean.withColumn("price", col("price").cast("double"))

# Feature numeric casting
feature_columns = [
    "latitude",
    "longitude",
    "minimum_nights",
    "number_of_reviews",
    "reviews_per_month",
    "calculated_host_listings_count",
    "availability_365"
]
for col_name in feature_columns:
    df_clean = df_clean.withColumn(
        col_name,
        regexp_replace(col(col_name), '[^0-9.]', '')
    )
    df_clean = df_clean.withColumn(
        col_name,
        when(length(col(col_name)) > 0, col(col_name).cast(DoubleType())).otherwise(None)
    )
    # Replace nulls with 0 to avoid errors
    df_clean = df_clean.fillna({col_name: 0})

# -------------------------------------------
# Exploratory Data Analysis (EDA) – 6 Key Plots
# -------------------------------------------

# Convert Spark DataFrame to Pandas
pandas_df = df_clean.select([
    'price', 'minimum_nights', 'number_of_reviews',
    'reviews_per_month', 'calculated_host_listings_count',
    'availability_365', 'latitude', 'longitude', 'room_type', 'neighbourhood'
]).toPandas()

# 1. Price Distribution
plt.figure(figsize=(10,6))
sns.histplot(pandas_df['price'], bins=50, color='crimson', kde=True)
plt.title('Price Distribution')
plt.xlabel('Price')
plt.ylabel('Count')
plt.xlim(0, 1000)
plt.savefig("price_distribution.png", dpi=300, bbox_inches='tight')
plt.show()

# 2. Room Type Counts
plt.figure(figsize=(8,5))
sns.countplot(data=pandas_df, x='room_type', hue='room_type', palette='Set2',
              order=pandas_df['room_type'].value_counts().index, legend=False)
plt.title('Room Type Distribution')
plt.savefig("room_type_distribution.png", dpi=300, bbox_inches='tight')
plt.show()

# 3. Top 10 Neighbourhoods
top_neigh = pandas_df['neighbourhood'].value_counts().nlargest(10).index
plt.figure(figsize=(12,6))
sns.countplot(data=pandas_df[pandas_df['neighbourhood'].isin(top_neigh)], 
              y='neighbourhood', hue='neighbourhood', legend=False, palette='tab20', order=top_neigh)
plt.title('Top 10 Neighbourhoods by Listings')
plt.savefig("neighbourhood_distribution.png", dpi=300, bbox_inches='tight')
plt.show()

# 4. Boxplot Price vs Room Type
plt.figure(figsize=(10,6))
sns.boxplot(data=pandas_df[pandas_df['price'] <= 500], x='room_type', y='price', palette='Set2', hue='room_type', legend=False)
plt.title('Price Distribution by Room Type (Price ≤ 500)')
plt.savefig("boxplot_price_roomtype.png", dpi=300, bbox_inches='tight')
plt.show()

# 5. Boxplot Price by Neighbourhood
plt.figure(figsize=(14,8))
sns.boxplot(data=pandas_df, y='neighbourhood', x='price', palette='tab20', hue='neighbourhood', legend=False)
plt.title('Price Distribution by Neighbourhood', fontsize=16, weight='bold')
plt.xlabel('Price', fontsize=14)
plt.ylabel('Neighbourhood', fontsize=14)
plt.savefig("boxplot_price_by_neighbourhood.png", dpi=300, bbox_inches='tight')
plt.show()

# 6. Correlation Heatmap
numeric_cols = [
    'price', 'minimum_nights', 'number_of_reviews',
    'reviews_per_month', 'calculated_host_listings_count',
    'availability_365', 'latitude', 'longitude'
]
corr = pandas_df[numeric_cols].corr()
plt.figure(figsize=(10,8))
sns.heatmap(corr, annot=True, cmap='coolwarm', fmt=".2f")
plt.title('Correlation Matrix')
plt.savefig("correlation_heatmap.png", dpi=300, bbox_inches='tight')
plt.show()

# -------------------------------------------
# Model Training & Evaluation
# -------------------------------------------

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_vector = assembler.transform(df_clean).select("features", "price")

train_data, test_data = df_vector.randomSplit([0.8, 0.2], seed=42)

evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
n = test_data.count()
p = len(feature_columns)

results = []

# Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)
lr_rmse = evaluator_rmse.evaluate(lr_predictions)
lr_r2 = evaluator_r2.evaluate(lr_predictions)
lr_adj_r2 = 1 - ((1 - lr_r2) * (n - 1) / (n - p - 1))
results.append({"Model": "Linear Regression", "RMSE": lr_rmse, "R2": lr_r2, "Adj_R2": lr_adj_r2})

# Random Forest Regression
rf = RandomForestRegressor(featuresCol="features", labelCol="price", maxBins=50)
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
rf_rmse = evaluator_rmse.evaluate(rf_predictions)
rf_r2 = evaluator_r2.evaluate(rf_predictions)
rf_adj_r2 = 1 - ((1 - rf_r2) * (n - 1) / (n - p - 1))
results.append({"Model": "Random Forest Regression", "RMSE": rf_rmse, "R2": rf_r2, "Adj_R2": rf_adj_r2})

# Gradient Boosted Trees
gbt = GBTRegressor(featuresCol="features", labelCol="price", maxBins=100)
gbt_model = gbt.fit(train_data)
gbt_predictions = gbt_model.transform(test_data)
gbt_rmse = evaluator_rmse.evaluate(gbt_predictions)
gbt_r2 = evaluator_r2.evaluate(gbt_predictions)
gbt_adj_r2 = 1 - ((1 - gbt_r2) * (n - 1) / (n - p - 1))
results.append({"Model": "Gradient Boosted Trees", "RMSE": gbt_rmse, "R2": gbt_r2, "Adj_R2": gbt_adj_r2})

# Print Results
print("📌 Model Comparison Summary")
for res in results:
    print(res)

# Stop Spark Session
spark.stop()


25/08/13 12:37:03 ERROR Executor: Exception in task 2.0 in stage 6.0 (TID 14)
org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'Private room' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"__gt__" was called from
line 28 in cell [5]

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:145)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toLongExact(UTF8StringUtils.scala:31)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils.toLongExact(UTF8StringUtils.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.ex

NumberFormatException: [CAST_INVALID_INPUT] The value 'Private room' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"__gt__" was called from
line 28 in cell [5]
