In [1]:
import findspark
findspark.init("C:/spark/spark-3.5.1-bin-hadoop3")

# Spark Context & Spark Session

In [2]:
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from datetime import datetime

from pyspark.sql import types
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import OneHotEncoder, StringIndexer

from pyspark.sql.functions import *

In [3]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns
# ("label","features")

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [4]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
SparkContext.setSystemProperty('spark.hadoop.dfs.client.use.datanode.hostname', 'true')

In [5]:
sc = SparkContext(master="local", appName="New Spark Context")

In [6]:
sc

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("motorbike_price_prediction").getOrCreate()

# ƒê·ªçc file CSV (ƒë·ªïi ƒë∆∞·ªùng d·∫´n cho ƒë√∫ng)
path = r"D:/DATA_SCIENCE_KHTN/mon_7/project_1/motorbike_cleaned.csv"
data = spark.read.csv(path, header=True, inferSchema=True)

data.printSchema()
data.show(5)

root
 |-- title: string (nullable = true)
 |-- price: double (nullable = true)
 |-- price_min: double (nullable = true)
 |-- price_max: double (nullable = true)
 |-- location: string (nullable = true)
 |-- description: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- category: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- warranty: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- href: string (nullable = true)
 |-- years_used: string (nullable = true)

+--------------------+---------+---------+--------------------+--------------------+--------------------+-------+---------+------+-------+----------+--------+---------------+-------------+-------------+-------+--------------------+----------+
|               title|    price|price_min

In [8]:
from pyspark.sql.functions import col, when

# T·∫°o c·ªôt mid_price
data = data.withColumn("mid_price", (col("price_min") + col("price_max")) / 2)

# Thay gi√° ngo√†i kho·∫£ng b·∫±ng mid-point
data = data.withColumn(
    "price_fixed",
    when((col("price") < col("price_min")) | (col("price") > col("price_max")), col("mid_price"))
    .otherwise(col("price"))
)

# Ki·ªÉm tra k·∫øt qu·∫£
data.select("brand", "model", "price_min", "price", "price_max", "price_fixed").show(10, truncate=False)

+--------+---------+---------+---------+--------------------+-----------+
|brand   |model    |price_min|price    |price_max           |price_fixed|
+--------+---------+---------+---------+--------------------+-----------+
|Honda   |Vision   |2.8E7    |3.7E7    |3.286E7             |3.043E7    |
|Piaggio |Vespa    |4.31E7   |4.5E7    |5.06E7              |4.5E7      |
|Yamaha  |Latte    |1.702E7  |2.3E7    |1.998E7             |1.85E7     |
|SYM     |Elegant  |7730000.0|7500000.0|9080000.0           |8405000.0  |
|Honda   |Air Blade|1.383E7  |1.6E7    |1.6239999999999998E7|1.6E7      |
|Honda   |Wave     |1.008E7  |7300000.0|1.183E7             |1.0955E7   |
|Yamaha  |Sirius   |6970000.0|7500000.0|8189999.999999999   |7500000.0  |
|Honda   |Lead     |2.745E7  |3.3E7    |3.222E7             |2.9835E7   |
|Kawasaki|D√≤ng kh√°c|2.208E7  |1.99E7   |2.592E7             |2.4E7      |
|SYM     |Wolf     |1.362E7  |1.48E7   |1.598E7             |1.48E7     |
+--------+---------+---------+------

In [9]:
data = data.withColumn(
    "price",
    when((col("price") < col("price_min")) | (col("price") > col("price_max")), (col("price_min") + col("price_max")) / 2)
    .otherwise(col("price"))
)

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, LongType, FloatType, ShortType, DecimalType, StringType

numeric_types = (DoubleType, IntegerType, LongType, FloatType, ShortType, DecimalType)
numeric_cols  = [f.name for f in data.schema.fields if isinstance(f.dataType, numeric_types)]
string_cols   = [f.name for f in data.schema.fields if isinstance(f.dataType, StringType)]

print("Numeric cols:", numeric_cols)
print("String  cols:", string_cols)

Numeric cols: ['price', 'price_min', 'price_max', 'mid_price', 'price_fixed']
String  cols: ['title', 'location', 'description', 'brand', 'model', 'year', 'mileage', 'condition', 'category', 'engine_capacity', 'origin', 'warranty', 'weight', 'href', 'years_used']


In [11]:
# Spark describe: count, mean, stddev, min, max
data.select(*numeric_cols).describe().show(truncate=False)

+-------+------------------+--------------------+--------------------+--------------------+------------------+
|summary|price             |price_min           |price_max           |mid_price           |price_fixed       |
+-------+------------------+--------------------+--------------------+--------------------+------------------+
|count  |5807              |5807                |5807                |5807                |5807              |
|mean   |1.85821084422249E7|1.707582917168934E7 |2.005503271913208E7 |1.856543094541071E7 |1.85821084422249E7|
|stddev |1.30708173654532E7|1.1993024759570112E7|1.4090642381696567E7|1.3038490015451137E7|1.30708173654532E7|
|min    |1335000.0         |1230000.0           |1440000.0           |1335000.0           |1335000.0         |
|max    |6.8E7             |5.873E7             |6.895E7             |6.384E7             |6.8E7             |
+-------+------------------+--------------------+--------------------+--------------------+------------------+



In [12]:
from pyspark.ml.feature import StringIndexer

cat_cols = ["model", "category"]

# 1) Xo√° c·ªôt *_idx c≈© n·∫øu ƒë√£ t·ªìn t·∫°i (tr√°nh l·ªói khi rerun cell)
to_drop = [f"{c}_idx" for c in cat_cols if f"{c}_idx" in data.columns]
if to_drop:
    data = data.drop(*to_drop)

# 2) Index tu·∫ßn t·ª± t·ª´ng c·ªôt (v·∫´n l√† for, kh√¥ng pipeline)
for c in cat_cols:
    out_col = f"{c}_idx"
    # N·∫øu mu·ªën an to√†n th√™m, ch·ªâ t·∫°o khi ch∆∞a c√≥
    if out_col not in data.columns:
        indexer = StringIndexer(inputCol=c, outputCol=out_col, handleInvalid="keep")
        data = indexer.fit(data).transform(data)

data.select("model", "model_idx", "category", "category_idx").show(5, truncate=False)


+---------+---------+--------+------------+
|model    |model_idx|category|category_idx|
+---------+---------+--------+------------+
|Vision   |4.0      |Tay ga  |0.0         |
|Vespa    |10.0     |Tay ga  |0.0         |
|Latte    |135.0    |Tay ga  |0.0         |
|Elegant  |46.0     |Xe s·ªë   |1.0         |
|Air Blade|1.0      |Tay ga  |0.0         |
+---------+---------+--------+------------+
only showing top 5 rows



In [13]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in cat_cols],
    outputCols=[f"{c}_vec" for c in cat_cols],
    dropLast=True
)

data_encoded = encoder.fit(data).transform(data)

In [14]:
data_encoded.select(
    
    "model", "model_idx", "model_vec",
    "category", "category_idx", "category_vec"
).show(5, truncate=False)


+---------+---------+-----------------+--------+------------+-------------+
|model    |model_idx|model_vec        |category|category_idx|category_vec |
+---------+---------+-----------------+--------+------------+-------------+
|Vision   |4.0      |(162,[4],[1.0])  |Tay ga  |0.0         |(7,[0],[1.0])|
|Vespa    |10.0     |(162,[10],[1.0]) |Tay ga  |0.0         |(7,[0],[1.0])|
|Latte    |135.0    |(162,[135],[1.0])|Tay ga  |0.0         |(7,[0],[1.0])|
|Elegant  |46.0     |(162,[46],[1.0]) |Xe s·ªë   |1.0         |(7,[1],[1.0])|
|Air Blade|1.0      |(162,[1],[1.0])  |Tay ga  |0.0         |(7,[0],[1.0])|
+---------+---------+-----------------+--------+------------+-------------+
only showing top 5 rows



In [15]:
from pyspark.sql import functions as F

# In schema tr∆∞·ªõc ƒë·ªÉ th·∫•y hi·ªán tr·∫°ng
print("Schema BEFORE:")
data_encoded.printSchema()

# √âp v·ªÅ double: b·ªè k√Ω t·ª± ngo√†i s·ªë, d·∫•u ch·∫•m v√† d·∫•u tr·ª´
data_encoded = (
    data_encoded
    .withColumn("mileage", F.regexp_replace(F.col("mileage").cast("string"), r"[^0-9\.\-]", "").cast("double"))
    .withColumn("years_used", F.regexp_replace(F.col("years_used").cast("string"), r"[^0-9\.\-]", "").cast("double"))
)

# (tu·ª≥ ch·ªçn) B·ªè d√≤ng thi·∫øu 2 c·ªôt n√†y n·∫øu c√≤n null
data_encoded = data_encoded.dropna(subset=["mileage", "years_used"])

print("\nSchema AFTER:")
data_encoded.printSchema()
data_encoded.select("mileage","years_used").show(5)


Schema BEFORE:
root
 |-- title: string (nullable = true)
 |-- price: double (nullable = true)
 |-- price_min: double (nullable = true)
 |-- price_max: double (nullable = true)
 |-- location: string (nullable = true)
 |-- description: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- category: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- warranty: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- href: string (nullable = true)
 |-- years_used: string (nullable = true)
 |-- mid_price: double (nullable = true)
 |-- price_fixed: double (nullable = true)
 |-- model_idx: double (nullable = false)
 |-- category_idx: double (nullable = false)
 |-- model_vec: vector (nullable = true)
 |-- category_vec: vector (nullable = true)


Schema AFTER

In [16]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["mileage", "years_used",  "model_vec", "category_vec"],
    outputCol="features"   # inputs
)

data_pre = assembler.transform(data_encoded)

data_pre.select("features").show(2, False)
data_pre.show(2)


+----------------------------------------+
|features                                |
+----------------------------------------+
|(171,[0,1,6,164],[12000.0,2.0,1.0,1.0]) |
|(171,[0,1,12,164],[60000.0,6.0,1.0,1.0])|
+----------------------------------------+
only showing top 2 rows

+--------------------+-------+---------+---------+--------------------+--------------------+-------+------+------+-------+----------+--------+---------------+-------------+-------------+-------+--------------------+----------+---------+-----------+---------+------------+----------------+-------------+--------------------+
|               title|  price|price_min|price_max|            location|         description|  brand| model|  year|mileage| condition|category|engine_capacity|       origin|     warranty| weight|                href|years_used|mid_price|price_fixed|model_idx|category_idx|       model_vec| category_vec|            features|
+--------------------+-------+---------+---------+-------------------

In [17]:
from pyspark.sql import functions as F

final_data = data_pre.select("features", F.col("price").alias("price"))

train_data, test_data = final_data.randomSplit([0.7, 0.3], seed=42)

train_data.describe().show()
test_data.describe().show()

+-------+--------------------+
|summary|               price|
+-------+--------------------+
|  count|                4148|
|   mean|1.8319493541465767E7|
| stddev| 1.276077357535034E7|
|    min|           2000000.0|
|    max|               6.7E7|
+-------+--------------------+

+-------+--------------------+
|summary|               price|
+-------+--------------------+
|  count|                1654|
|   mean|1.9261725824667472E7|
| stddev|1.3800570962255662E7|
|    min|           1335000.0|
|    max|               6.8E7|
+-------+--------------------+



In [18]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(
    featuresCol="features",
    labelCol="price",
    predictionCol="Predict_price",
    maxIter=150,       # 100‚Äì300 th∆∞·ªùng t·ªët
    maxDepth=7,        # 5‚Äì9 tu·ª≥ d·ªØ li·ªáu
    stepSize=0.1,      # learning rate
    subsamplingRate=1.0,
    seed=42
)

In [19]:
gbt_model = gbt.fit(train_data)
print("‚úÖ GBT fitted")

‚úÖ GBT fitted


In [20]:
from pyspark.sql import functions as F

# D·ª± b√°o b·∫±ng m√¥ h√¨nh GBT
gbt_pred = gbt_model.transform(test_data)

# T√≠nh residual v√† % ch√™nh l·ªách
gbt_pred = (
    gbt_pred
    .withColumn("residual", F.col("price") - F.col("Predict_price"))
    .withColumn(
        "pct_diff_%", 
        F.when(
            F.col("price") != 0,
            ((F.col("Predict_price") - F.col("price")) / F.col("price")) * 100
        ).otherwise(F.lit(None))
    )
)

# L√†m tr√≤n cho d·ªÖ nh√¨n
gbt_pred = (
    gbt_pred
    .withColumn("Predict_price_round", F.round("Predict_price", 0))
    .withColumn("price_round", F.round("price", 0))
    .withColumn("residual_round", F.round("residual", 0))
    .withColumn("pct_diff_%_round", F.round("pct_diff_%", 2))
)

# Hi·ªÉn th·ªã k·∫øt qu·∫£
gbt_pred.select(
    "Predict_price_round", "price_round", "residual_round", "pct_diff_%_round"
).show(10, truncate=False)

+-------------------+-----------+--------------+----------------+
|Predict_price_round|price_round|residual_round|pct_diff_%_round|
+-------------------+-----------+--------------+----------------+
|7925322.0          |6585000.0  |-1340322.0    |20.35           |
|1.0719397E7        |1.13E7     |580603.0      |-5.14           |
|7389920.0          |7370000.0  |-19920.0      |0.27            |
|7389920.0          |7370000.0  |-19920.0      |0.27            |
|1.0716023E7        |1.0955E7   |238977.0      |-2.18           |
|9618645.0          |1.037E7    |751355.0      |-7.25           |
|1.1122744E7        |1.113E7    |7256.0        |-0.07           |
|7694862.0          |6455000.0  |-1239862.0    |19.21           |
|7837239.0          |7355000.0  |-482239.0     |6.56            |
|7694862.0          |6585000.0  |-1109862.0    |16.85           |
+-------------------+-----------+--------------+----------------+
only showing top 10 rows



In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

for metric in ["rmse", "mse", "r2"]:
    ev = RegressionEvaluator(labelCol="price", predictionCol="Predict_price", metricName=metric)
    val = ev.evaluate(gbt_pred)
    print(f"GBT {metric.upper()}: {val}")

GBT RMSE: 4560312.803944209
GBT MSE: 20796452869817.49
GBT R2: 0.8907408522171143
