### 1. Chuẩn bị dữ liệu

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

spark = (
    SparkSession.builder
    .appName("RegressionModels")
    .getOrCreate()
)

25/10/17 13:46:37 WARN Utils: Your hostname, soaz resolves to a loopback address: 127.0.1.1; using 192.168.100.246 instead (on interface wlp0s20f3)
25/10/17 13:46:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/17 13:46:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.functions import col

df = spark.read.csv('../../data/cleaned_data/data.csv', inferSchema= True, header= True)

print("📊 Dữ liệu ban đầu:")
df.show(5, truncate=False)
df.printSchema()


📊 Dữ liệu ban đầu:
+--------+---------------------+------------+------+---+----+----+------+
|image_id|street               |citi        |n_citi|bed|bath|sqft|price |
+--------+---------------------+------------+------+---+----+----+------+
|1       |124 C Street W       |Brawley, CA |48    |3  |2   |713 |228500|
|2       |2304 Clark Road      |Imperial, CA|152   |3  |1   |800 |273950|
|3       |755 Brawley Avenue   |Brawley, CA |48    |3  |1   |1082|350000|
|4       |2207 R Carrillo Court|Calexico, CA|55    |4  |3   |2547|385100|
|6       |1100 CAMILIA Street  |Calexico, CA|55    |4  |3   |2769|415000|
+--------+---------------------+------------+------+---+----+----+------+
only showing top 5 rows

root
 |-- image_id: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- citi: string (nullable = true)
 |-- n_citi: integer (nullable = true)
 |-- bed: integer (nullable = true)
 |-- bath: integer (nullable = true)
 |-- sqft: integer (nullable = true)
 |-- price: integer (

### 2. Tiền xử lý dữ liệu và chuẩn bị features

In [4]:
from pyspark.sql.functions import regexp_replace

df = df.withColumn("street", regexp_replace("street", "^[^ ]+ ", ""))

In [5]:
df.groupBy("street").count().orderBy(col("count").desc())


DataFrame[street: string, count: bigint]

In [6]:
from pyspark.sql.functions import countDistinct

df.select(countDistinct("street")).show()

+----------------------+
|count(DISTINCT street)|
+----------------------+
|                 10378|
+----------------------+



In [7]:

# B1: Tính tần suất xuất hiện của mỗi street
street_freq = df.groupBy("street").count().withColumnRenamed("count", "street_frequency")

# B2: Join trở lại với DataFrame gốc
df = df.join(street_freq, on="street", how="left")

# B3: Thay cột street bằng tần suất
df = df.withColumn("street", col("street_frequency")).drop("street_frequency")

In [8]:
df.groupBy("citi").count().orderBy(col("count").desc())

DataFrame[citi: string, count: bigint]

In [9]:
df.select(countDistinct("citi")).show()

+--------------------+
|count(DISTINCT citi)|
+--------------------+
|                 415|
+--------------------+



In [10]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import when, col

def spark_get_dummies(df: DataFrame, column: str) -> DataFrame:
    unique_values = [row[0] for row in df.select(column).distinct().collect()]
    for val in unique_values:
        df = df.withColumn(f"{column}_{val}", when(col(column) == val, 1).otherwise(0))
    return df.drop(column)


In [11]:
df = spark_get_dummies(df, "citi")


In [None]:
cols = [c for c in df.columns if c != "price"] + ["price"]
df = df.select(*cols)  
                           

In [13]:
df.show(5, truncate=False)
df.printSchema()

25/10/17 13:47:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------+--------+------+---+----+----+---------------+---------------+----------------+----------------+----------------------+------------------------+--------------+---------------------+--------------------+-----------------+---------------------+-----------------+---------------------+--------------------+------------------+-----------------------+-------------------+------------------------+-------------------+------------------+---------------------+------------------------+----------------+---------------+----------------+-------------------+-------------------+--------------------+------------------------+-------------------+--------------+----------------+------------------+---------------------+-------------------+-----------------+------------------+-----------------+----------------+--------------------+---------------+-----------------+----------------------+-------------------+---------------------+-------------------------+----------------+-----------------------------+-

In [None]:
from pyspark.ml.feature import VectorAssembler

# Giả sử df là Spark DataFrame
columns = df.columns
feature_cols = columns[:-1]  # tất cả cột trừ cột cuối
label_col = columns[-1]      # cột cuối cùng là nhãn

# Gộp các cột đặc trưng thành một vector duy nhất
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_transformed = assembler.transform(df)

df_transformed.show(5, truncate=False)


+---------------------------------------------------------+------+
|features                                                 |price |
+---------------------------------------------------------+------+
|(421,[0,1,2,3,4,5,116],[1.0,1.0,48.0,3.0,2.0,713.0,1.0]) |228500|
|(421,[0,1,2,3,4,5,362],[1.0,2.0,152.0,3.0,1.0,800.0,1.0])|273950|
|(421,[0,1,2,3,4,5,116],[1.0,3.0,48.0,3.0,1.0,1082.0,1.0])|350000|
|(421,[0,1,2,3,4,5,129],[1.0,4.0,55.0,4.0,3.0,2547.0,1.0])|385100|
|(421,[0,1,2,3,4,5,129],[1.0,6.0,55.0,4.0,3.0,2769.0,1.0])|415000|
+---------------------------------------------------------+------+
only showing top 5 rows



In [28]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from xgboost.spark import SparkXGBRegressor
from pyspark.sql import SparkSession, functions as F

# --- 2. Chia train / valid / test ---
train_data, valid_data, test_data = df_transformed.randomSplit([0.7, 0.15, 0.15], seed=42)

# --- 3. Hàm đánh giá ---
def evaluate(predictions, label_col):
    evaluator_rmse = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
    evaluator_r2 = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2")
    rmse = evaluator_rmse.evaluate(predictions)
    r2 = evaluator_r2.evaluate(predictions)
    print(f"✅ RMSE = {rmse:.4f} | R² = {r2:.4f}")
    return rmse, r2

In [32]:
print("\n🚀 Training Linear Regression ...")
lr = LinearRegression(featuresCol="features", labelCol=label_col,regParam=0.5, 
    elasticNetParam=0.8, maxIter=500)
lr_model = lr.fit(train_data) 
pred_lr = lr_model.transform(test_data)
evaluate(pred_lr, label_col)



🚀 Training Linear Regression ...
✅ RMSE = 209060.2386 | R² = 0.7092


(209060.23864855382, 0.7092113160232457)

In [17]:
print("\n🚀 Training Decision Tree ...")
dt = DecisionTreeRegressor(featuresCol="features", labelCol=label_col, maxDepth=8)
dt_model = dt.fit(train_data)
pred_dt = dt_model.transform(valid_data)
evaluate(pred_dt, label_col)



🚀 Training Decision Tree ...
✅ RMSE = 202760.3626 | R² = 0.7259


(202760.3625626809, 0.7259415832248424)

In [None]:
print("\n🚀 Training Random Forest ...")
rf = RandomForestRegressor(featuresCol="features", labelCol=label_col, numTrees=200, maxDepth=10)
rf_model = rf.fit(train_data)
pred_rf = rf_model.transform(valid_data)
evaluate(pred_rf, label_col)



🚀 Training Random Forest ...


25/10/17 14:50:35 WARN DAGScheduler: Broadcasting large task binary with size 1194.4 KiB
25/10/17 14:50:38 WARN DAGScheduler: Broadcasting large task binary with size 2013.1 KiB
25/10/17 14:50:42 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
25/10/17 14:50:49 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
25/10/17 14:50:56 WARN DAGScheduler: Broadcasting large task binary with size 10.0 MiB
25/10/17 14:51:03 WARN DAGScheduler: Broadcasting large task binary with size 1066.1 KiB
25/10/17 14:51:05 WARN DAGScheduler: Broadcasting large task binary with size 15.7 MiB
25/10/17 14:51:12 WARN DAGScheduler: Broadcasting large task binary with size 1619.5 KiB
25/10/17 14:51:14 WARN DAGScheduler: Broadcasting large task binary with size 23.5 MiB
25/10/17 14:51:22 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/10/17 14:51:24 WARN DAGScheduler: Broadcasting large task binary with size 33.6 MiB
25/10/17 14:51:35 WARN DAGScheduler: B

✅ RMSE = 216525.1084 | R² = 0.6875


(216525.1083640161, 0.6874686733394906)

In [None]:
from xgboost.spark import SparkXGBRegressor

xgb = SparkXGBRegressor(
    features_col="features",
    label_col=label_col,
    objective="reg:squarederror",
    eval_metric="rmse",
    num_workers=1,
    num_boost_round=200,
    max_depth=6,
    eta=0.1
)
xgb_model = xgb.fit(train_data)
pred_xgb = xgb_model.transform(valid_data)   
evaluate(pred_xgb, label_col)       



2025-10-17 14:53:00,992 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'max_depth': 6, 'eta': 0.1, 'eval_metric': 'rmse', 'nthread': 1}
	train_call_kwargs_params: {'num_boost_round': 100, 'verbose_eval': True}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[14:53:04] task 0 got new rank 0                                    (0 + 1) / 1]
2025-10-17 14:53:06,014 INFO XGBoost-PySpark: _fit Finished xgboost training!   
INFO:XGBoost-PySpark:Do the inference on the CPUs


✅ RMSE = 121334.2835 | R² = 0.9019


INFO:XGBoost-PySpark:Do the inference on the CPUs


(121334.28352576273, 0.9018604445907641)

In [35]:
pred_xgb = xgb_model.transform(test_data)
evaluate(pred_xgb, label_col)


INFO:XGBoost-PySpark:Do the inference on the CPUs


✅ RMSE = 118313.9392 | R² = 0.9069


INFO:XGBoost-PySpark:Do the inference on the CPUs


(118313.93920201344, 0.9068663879489318)