In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, mean
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, log1p
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StringIndexer
from pyspark.sql.functions import pandas_udf
import pandas as pd
from scipy.stats import boxcox

In [0]:
accesskey = "****"
storage_account_name = "airbnbetlstorage"
container_name = "airbnbcontainer"

spark.conf.set("fs.azure.account.key." + storage_account_name + ".blob.core.windows.net", accesskey)

In [0]:
listing=spark.read.format("csv").option("header", "true").load("wasbs://"+container_name+"@"+storage_account_name+".blob.core.windows.net/listings.csv", inferSchema=True)
calendar=spark.read.format("csv").option("header", "true").load("wasbs://"+container_name+"@"+storage_account_name+".blob.core.windows.net/calendar.csv", inferSchema=True)
reviews=spark.read.format("csv").option("header", "true").load("wasbs://"+container_name+"@"+storage_account_name+".blob.core.windows.net/reviews.csv" , inferSchema=True)

In [0]:
listing.printSchema()

root
 |-- id: double (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: double (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: integer (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_has_profile_pic:

In [0]:
listing.show()

+-----------+--------------------+---------+------------+-----------+--------------------+----+----+--------------------+--------+--------------------+---------+----------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+---------+----------+--------------------+---------------+------------+---------+----------------+--------+----+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+---------------------+-----------------+---------------------+----------------------+------------+-----------+--------------------+-------

In [0]:
calendar.show()

+----------+----------+---------+------+--------------+--------------+--------------+
|listing_id|      date|available| price|adjusted_price|minimum_nights|maximum_nights|
+----------+----------+---------+------+--------------+--------------+--------------+
|   2992450|2024-09-05|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-06|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-07|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-08|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-09|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-10|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-11|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-12|        f|$70.00|          NULL|            28|          1125|
|   2992450|2024-09-13|        f|$70.00|          NULL

In [0]:
reviews.show()

+--------------------+--------------------+--------------------+-----------+---------------+--------------------+
|          listing_id|                  id|                date|reviewer_id|  reviewer_name|            comments|
+--------------------+--------------------+--------------------+-----------+---------------+--------------------+
|             2992450|            15066586|          2014-07-01|   16827297|        Kristen|Large apartment; ...|
|             2992450|            21810844|          2014-10-24|   22648856|    Christopher|This may be a lit...|
|<br/>The apartmen...| clean with many ...| although like th...|       NULL|           NULL|                NULL|
|              <br/>"|                NULL|                NULL|       NULL|           NULL|                NULL|
|             2992450|            27434334|          2015-03-04|      45406|          Altay|The apartment was...|
|             2992450|            28524578|          2015-03-25|    5485362|           J

In [0]:
listing.select("price").show()

+-------+
|  price|
+-------+
| $70.00|
|$116.00|
| $75.00|
|$116.00|
|$305.00|
|$199.00|
| $86.00|
| $65.00|
| $50.00|
|$214.00|
| $55.00|
| $45.00|
| $50.00|
|$219.00|
| $45.00|
|$243.00|
| $60.00|
| $70.00|
| $53.00|
| $45.00|
+-------+
only showing top 20 rows



In [0]:
# Convert 'available' column
calendar = calendar.withColumn("available", when(col("available") == 't', 1).otherwise(0))



In [0]:
calendar.filter(col("price").isNull()).show()


+----------+----+---------+-----+--------------+--------------+--------------+
|listing_id|date|available|price|adjusted_price|minimum_nights|maximum_nights|
+----------+----+---------+-----+--------------+--------------+--------------+
+----------+----+---------+-----+--------------+--------------+--------------+



In [0]:

# Drop 'adjusted_price' column
calendar = calendar.drop("adjusted_price")

from pyspark.sql.functions import regexp_replace, col

# Remove the '$' symbol and convert the price column to double
calendar = calendar.withColumn("price", regexp_replace(col("price"), r"[$,]", "").cast("double"))
listing = listing.withColumn("price", regexp_replace(col("price"), r"[$,]", "").cast("double"))


In [0]:
calendar.select("price").show()

+-----+
|price|
+-----+
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
| 70.0|
+-----+
only showing top 20 rows



In [0]:
# Aggregate calendar data
calendar_summary = calendar.groupBy("listing_id").agg(
    mean("price").alias("avg_price"),
    mean("available").alias("availability_rate")
)

In [0]:
# Select necessary columns from listings
desired_cols = ["id", "price", "review_scores_communication", "review_scores_location", 
                "review_scores_value", "calculated_host_listings_count", "instant_bookable", "reviews_per_month"]
listings_data = listing.select(*desired_cols)

In [0]:
# Convert review score columns to DoubleType
for col_name in ["review_scores_communication", "review_scores_location", "review_scores_value"]:
    listings_data = listings_data.withColumn(col_name, col(col_name).cast("double"))

In [0]:
# Merge calendar_summary with listings_data
merged_data = calendar_summary.join(listings_data, calendar_summary.listing_id == listings_data.id, "left").drop("id")


In [0]:
merged_data.show()

+------------------+---------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-----------------+
|        listing_id|avg_price|  availability_rate|price|review_scores_communication|review_scores_location|review_scores_value|calculated_host_listings_count|instant_bookable|reviews_per_month|
+------------------+---------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-----------------+
|          26139592|    110.0| 0.9342465753424658| 98.0|                       4.98|                  4.96|                4.8|                             1|               f|             1.59|
|          54369238|    125.0|0.40273972602739727| 99.0|                        5.0|                  4.76|               4.84|                             9|               f|             1.56|
|756752556396798800|     50.0|

In [0]:
# Fill missing values
median_review_scores = {
    "review_scores_communication": merged_data.approxQuantile("review_scores_communication", [0.5], 0)[0],
    "review_scores_location": merged_data.approxQuantile("review_scores_location", [0.5], 0)[0],
    "review_scores_value": merged_data.approxQuantile("review_scores_value", [0.5], 0)[0]
}

In [0]:
calendar_summary.select("listing_id", "avg_price").show()


+------------------+---------+
|        listing_id|avg_price|
+------------------+---------+
|          26139592|    110.0|
|          54369238|    125.0|
|756752556396798800|     50.0|
|          50088935|    100.0|
|          44180048|    109.0|
|          45024912|    120.0|
|787023821243442039|    209.0|
|904392960639597224|    144.0|
|          20519885|    275.0|
|636902252641496970|     50.0|
|          28519423|     60.0|
|          32450400|    119.0|
|          52611613|    165.0|
|922634062451094710|    120.0|
|          48052353|    179.0|
|          51461480|     85.0|
|          36443605|    125.0|
|921076441920995839|     80.0|
|          44450012|    150.0|
|667644674529935702|    117.0|
+------------------+---------+
only showing top 20 rows



In [0]:
merged_data = merged_data.fillna({
    "price": merged_data.select(col("avg_price")).first()[0],
    "reviews_per_month": 0,
    **median_review_scores
})


In [0]:
merged_data.show()

+------------------+---------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-----------------+
|        listing_id|avg_price|  availability_rate|price|review_scores_communication|review_scores_location|review_scores_value|calculated_host_listings_count|instant_bookable|reviews_per_month|
+------------------+---------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-----------------+
|          26139592|    110.0| 0.9342465753424658| 98.0|                       4.98|                  4.96|                4.8|                             1|               f|             1.59|
|          54369238|    125.0|0.40273972602739727| 99.0|                        5.0|                  4.76|               4.84|                             9|               f|             1.56|
|756752556396798800|     50.0|

In [0]:

# Convert 'instant_bookable' to numeric
merged_data = merged_data.withColumn("instant_bookable", when(col("instant_bookable") == 't', 1).otherwise(0))

In [0]:
merged_data.show()

+------------------+---------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-----------------+
|        listing_id|avg_price|  availability_rate|price|review_scores_communication|review_scores_location|review_scores_value|calculated_host_listings_count|instant_bookable|reviews_per_month|
+------------------+---------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-----------------+
|          26139592|    110.0| 0.9342465753424658| 98.0|                       4.98|                  4.96|                4.8|                             1|               0|             1.59|
|          54369238|    125.0|0.40273972602739727| 99.0|                        5.0|                  4.76|               4.84|                             9|               0|             1.56|
|756752556396798800|     50.0|

In [0]:
# Apply log transformation for skewed data
skew_col = ["avg_price", "review_scores_communication", "review_scores_location", "review_scores_value", "reviews_per_month"]
for col_name in skew_col:
    merged_data = merged_data.withColumn(col_name, log1p(col(col_name)))

In [0]:
# List to store columns with invalid values
invalid_columns = []

# Iterate through columns and check for invalid values (zero or negative)
for column in merged_data.columns:
    if merged_data.filter(col(column) <= 0).count() > 0:
        invalid_columns.append(column)

# Print the list of columns with invalid values
print("Columns with invalid values (zero or negative):", invalid_columns)




Columns with invalid values (zero or negative): ['availability_rate', 'instant_bookable', 'reviews_per_month']


In [0]:
for column in invalid_columns:
    merged_data = merged_data.withColumn(column, when(col(column) <= 0, col(column) + 1).otherwise(col(column)))

# Show the updated data
merged_data.show()

+------------------+------------------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-------------------+
|        listing_id|         avg_price|  availability_rate|price|review_scores_communication|review_scores_location|review_scores_value|calculated_host_listings_count|instant_bookable|  reviews_per_month|
+------------------+------------------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-------------------+
|          26139592| 4.709530201312334| 0.9342465753424658| 98.0|         1.7884205679625405|    1.7850704810772584| 1.7578579175523736|                             1|               1| 0.9516578757114464|
|          54369238| 4.836281906951478|0.40273972602739727| 99.0|          1.791759469228055|    1.7509374747077997| 1.7647307968401358|                             9|             

In [0]:
merged_data.show()

+------------------+------------------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-------------------+
|        listing_id|         avg_price|  availability_rate|price|review_scores_communication|review_scores_location|review_scores_value|calculated_host_listings_count|instant_bookable|  reviews_per_month|
+------------------+------------------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-------------------+
|          26139592| 4.709530201312334| 0.9342465753424658| 98.0|         1.7884205679625405|    1.7850704810772584| 1.7578579175523736|                             1|               1| 0.9516578757114464|
|          54369238| 4.836281906951478|0.40273972602739727| 99.0|          1.791759469228055|    1.7509374747077997| 1.7647307968401358|                             9|             

In [0]:

# Label encoding categorical columns
categorical_cols = ['neighbourhood', 'room_type', 'property_type']
for col_name in categorical_cols:
    if col_name in merged_data.columns:  # Ensure the column exists
        indexer = StringIndexer(inputCol=str(col_name), outputCol=col_name + "_index", handleInvalid="keep")
        merged_data = indexer.fit(merged_data).transform(merged_data).drop(col_name)



In [0]:
%sh pip install textblob

Collecting textblob
  Obtaining dependency information for textblob from https://files.pythonhosted.org/packages/1e/d6/40aa5aead775582ea0cf35870e5a3f16fab4b967f1ad2debe675f673f923/textblob-0.19.0-py3-none-any.whl.metadata
  Downloading textblob-0.19.0-py3-none-any.whl.metadata (4.4 kB)
Collecting nltk>=3.9 (from textblob)
  Obtaining dependency information for nltk>=3.9 from https://files.pythonhosted.org/packages/4d/66/7d9e26593edda06e8cb531874633f7c2372279c3b0f46235539fe546df8b/nltk-3.9.1-py3-none-any.whl.metadata
  Downloading nltk-3.9.1-py3-none-any.whl.metadata (2.9 kB)
Collecting regex>=2021.8.3 (from nltk>=3.9->textblob)
  Obtaining dependency information for regex>=2021.8.3 from https://files.pythonhosted.org/packages/bf/ce/0d0e61429f603bac433910d99ef1a02ce45a8967ffbe3cbee48599e62d88/regex-2024.11.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading regex-2024.11.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (40 kB)
[?25l 


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [0]:
from textblob import TextBlob

In [0]:
def get_sentiment(text):
    if pd.isna(text):
        return 0
    return TextBlob(text).sentiment.polarity


In [0]:
@pandas_udf("double")
def sentiment_udf(comments: pd.Series) -> pd.Series:
    return comments.apply(get_sentiment)

In [0]:
reviews = reviews.withColumn("sentiment_score", sentiment_udf(col("comments")))
sentiment_df = reviews.groupBy("listing_id").agg(mean("sentiment_score").alias("sentiment_score"))

In [0]:
# Merge sentiment scores
merged_data = merged_data.join(sentiment_df, "listing_id", "left")
merged_data = merged_data.fillna({"sentiment_score": 0})


In [0]:
merged_data.show()

+------------------+------------------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-------------------+-------------------+
|        listing_id|         avg_price|  availability_rate|price|review_scores_communication|review_scores_location|review_scores_value|calculated_host_listings_count|instant_bookable|  reviews_per_month|    sentiment_score|
+------------------+------------------+-------------------+-----+---------------------------+----------------------+-------------------+------------------------------+----------------+-------------------+-------------------+
|          26139592| 4.709530201312334| 0.9342465753424658| 98.0|         1.7884205679625405|    1.7850704810772584| 1.7578579175523736|                             1|               1| 0.9516578757114464| 0.4591199415442383|
|          54369238| 4.836281906951478|0.40273972602739727| 99.0|          1.791759469228055|    1.7

In [0]:
merged_data=merged_data.drop("calculated_host_listings_count")

In [0]:
from pyspark.sql.functions import col, sum

merged_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_data.columns]).show()


+----------+---------+-----------------+-----+---------------------------+----------------------+-------------------+----------------+-----------------+---------------+
|listing_id|avg_price|availability_rate|price|review_scores_communication|review_scores_location|review_scores_value|instant_bookable|reviews_per_month|sentiment_score|
+----------+---------+-----------------+-----+---------------------------+----------------------+-------------------+----------------+-----------------+---------------+
|         0|        0|                0|    0|                          0|                     0|                  0|               0|                0|              0|
+----------+---------+-----------------+-----+---------------------------+----------------------+-------------------+----------------+-----------------+---------------+



In [0]:
merged_data.printSchema()


root
 |-- listing_id: long (nullable = true)
 |-- avg_price: double (nullable = true)
 |-- availability_rate: double (nullable = true)
 |-- price: double (nullable = false)
 |-- review_scores_communication: double (nullable = true)
 |-- review_scores_location: double (nullable = true)
 |-- review_scores_value: double (nullable = true)
 |-- instant_bookable: integer (nullable = false)
 |-- reviews_per_month: double (nullable = true)
 |-- sentiment_score: double (nullable = false)



**_Loading Data _**

In [0]:
# Define output path in Azure Blob Storage
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/processed_data/"


merged_data.write.mode("overwrite").parquet(output_path)

print("Data Loaded successfully")


Data Loaded successfully


In [0]:
!pip install xgboost

Collecting xgboost
  Obtaining dependency information for xgboost from https://files.pythonhosted.org/packages/e4/3c/e3a93bfa7e8693c825df5ec02a40f7ff5f0950e02198b1e85da9315a8d47/xgboost-2.1.4-py3-none-manylinux_2_28_x86_64.whl.metadata
  Downloading xgboost-2.1.4-py3-none-manylinux_2_28_x86_64.whl.metadata (2.1 kB)
Collecting nvidia-nccl-cu12 (from xgboost)
  Obtaining dependency information for nvidia-nccl-cu12 from https://files.pythonhosted.org/packages/11/0c/8c78b7603f4e685624a3ea944940f1e75f36d71bd6504330511f4a0e1557/nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata
  Downloading nvidia_nccl_cu12-2.25.1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.8 kB)
Downloading xgboost-2.1.4-py3-none-manylinux_2_28_x86_64.whl (223.6 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/223.6 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/223.6 MB[0m [31m1.

In [0]:
from pyspark.ml.feature import VectorAssembler, PolynomialExpansion
from pyspark.ml.evaluation import RegressionEvaluator
from xgboost import XGBRegressor
from sklearn.svm import SVR
from sklearn.linear_model import LinearRegression
import numpy as np
import pandas as pd



In [0]:
df=spark.read.parquet(output_path)

In [0]:
x = [
    "avg_price", "availability_rate", "review_scores_communication", 
    "review_scores_location", "review_scores_value", "reviews_per_month", 
    "sentiment_score", "instant_bookable"
]

y = "price"

In [0]:
# Split data into train and test sets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

In [0]:
from pyspark.ml.feature import VectorAssembler, PolynomialExpansion, StandardScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [0]:
vector_assembler = VectorAssembler(inputCols=x, outputCol="assembled_features")

In [0]:
# Step 1: Fine-Tune Polynomial Degree
print("Finding Best Polynomial Degree...")
degree_grid = [2, 3, 4]
best_r2 = float("-inf")
best_degree = None

for degree in degree_grid:
    polyExpansion = PolynomialExpansion(inputCol="assembled_features", outputCol="poly_features", degree=degree)
    scaler = StandardScaler(inputCol="poly_features", outputCol="scaled_features")
    
    pipeline = Pipeline(stages=[vector_assembler, polyExpansion, scaler, LinearRegression(featuresCol="scaled_features", labelCol=y)])
    paramGrid = ParamGridBuilder().addGrid(polyExpansion.degree, [degree]).build()
    
    evaluator = RegressionEvaluator(labelCol=y, predictionCol="prediction", metricName="r2")
    crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
    
    cvModel = crossval.fit(train_data)
    r2 = evaluator.evaluate(cvModel.transform(test_data))
    
    print(f"Polynomial Degree {degree}, R²: {r2}")
    if r2 > best_r2:
        best_r2 = r2
        best_degree = degree

print(f"Best Polynomial Degree: {best_degree} with R²: {best_r2}")


Finding Best Polynomial Degree...
Polynomial Degree 2, R²: 0.38897361159898813
Polynomial Degree 3, R²: 0.5601576807154383
Polynomial Degree 4, R²: 0.7234357064288208
Best Polynomial Degree: 4 with R²: 0.7234357064288208


In [0]:
print("Tuning Model Hyperparameters...")
polyExpansion = PolynomialExpansion(inputCol="assembled_features", outputCol="poly_features", degree=best_degree)
scaler = StandardScaler(inputCol="poly_features", outputCol="scaled_features")

# Model Selection
models = {
    "Linear Regression": LinearRegression(featuresCol="scaled_features", labelCol=y),
    "Lasso Regression": LinearRegression(featuresCol="scaled_features", labelCol=y, elasticNetParam=1.0),
    "Ridge Regression": LinearRegression(featuresCol="scaled_features", labelCol=y, elasticNetParam=0.0),
    "Random Forest": RandomForestRegressor(featuresCol="scaled_features", labelCol=y),
    "XGBoost": SparkXGBRegressor(features_col="scaled_features", label_col=y)
}

# Grid Search
param_grid = {
    "alpha": [0.1, 0.01, 0.2],
    "maxIter": [100, 200, 300],
    "numTrees": [20, 50],
    "maxDepth": [5, 10],
    "learning_rate": [0.1, 0.3]
}

best_model_name = None
best_model_r2 = float("-inf")

for model_name, model in models.items():
    print(f"Tuning {model_name}...")
    
    pipeline = Pipeline(stages=[vector_assembler, polyExpansion, scaler, model])
    
    paramGrid = ParamGridBuilder()
    if model_name in ["Linear Regression", "Lasso Regression", "Ridge Regression"]:
        paramGrid.addGrid(model.regParam, param_grid["alpha"]).addGrid(model.maxIter, param_grid["maxIter"])
    elif model_name == "Random Forest":
        paramGrid.addGrid(model.numTrees, param_grid["numTrees"]).addGrid(model.maxDepth, param_grid["maxDepth"])
    elif model_name == "XGBoost":
        paramGrid.addGrid(model.max_depth, param_grid["maxDepth"]).addGrid(model.learning_rate, param_grid["learning_rate"])
    paramGrid = paramGrid.build()
    
    crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
    cvModel = crossval.fit(train_data)
    r2 = evaluator.evaluate(cvModel.transform(test_data))
    
    print(f"Model: {model_name}, R²: {r2}")
    
    if r2 > best_model_r2:
        best_model_r2 = r2
        best_model_name = model_name

print(f"Best Model: {best_model_name} with R²: {best_model_r2}, using Polynomial Degree: {best_degree}")


Tuning Model Hyperparameters...
Tuning Linear Regression...
Model: Linear Regression, R²: 0.7073333096015096
Tuning Lasso Regression...
Model: Lasso Regression, R²: 0.5797365752853769
Tuning Ridge Regression...
Model: Ridge Regression, R²: 0.7073333096015096
Tuning Random Forest...
Model: Random Forest, R²: 0.550509780401078
Tuning XGBoost...


INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 5, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.3, 'max_depth': 5, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 10, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing':

Model: XGBoost, R²: 0.7151688420809756
Best Model: XGBoost with R²: 0.7151688420809756, using Polynomial Degree: 4


In [0]:
best_xgb_model = cvModel.bestModel.stages[-1]  # Last stage is XGBoost

# Extract best hyperparameters
best_params = {
    "max_depth": best_xgb_model.getOrDefault("max_depth"),
    "learning_rate": best_xgb_model.getOrDefault("learning_rate")
    
}

print(f"Best XGBoost Parameters: {best_params}")


Best XGBoost Parameters: {'max_depth': 5, 'learning_rate': 0.3}


In [0]:
xg_model = SparkXGBRegressor(features_col="scaled_features", label_col= y , max_depth=5, learning_rate=0.3)

In [0]:
best_pipeline = Pipeline(stages=[vector_assembler, polyExpansion, scaler, xg_model])

In [0]:
best_model_trained = best_pipeline.fit(merged_data)

INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'learning_rate': 0.3, 'max_depth': 5, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


In [0]:
import xgboost as xgb

# Extract the trained XGBoost model from the PySpark pipeline
xgb_model = best_model_trained.stages[-1]  # Get the last stage (XGBoost model)

# Get the native XGBoost booster object
native_xgb = xgb_model.get_booster()

# Save in XGBoost’s native format
native_xgb.save_model("xgboost_model.json")  

print("XGBoost model successfully saved in native format!")


XGBoost model successfully saved in native format!
