# **Regression with Decision Trees**

In [1]:
# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/baobu/spark-3.5.1-bin-hadoop3"

## Create a SparkSession in Python

In [2]:
# start pyspark
# !pip install findspark
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("Decision Tree")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/05/10 18:14:36 WARN Utils: Your hostname, LAPTOP-2K5E710T resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/10 18:14:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/10 18:14:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Structured API Implementation (High-level)

### Import

In [4]:
from pyspark.sql import functions as f
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

### 1. Read data

In [5]:
data = spark.read.csv("nyc-taxi-trip-duration/train/train.csv", header=True, inferSchema=True)
data.show(5)

                                                                                

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

In [6]:
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)



### 2. Data preprocessing:

- Kiểm tra giá trị NULL

In [7]:
data.select([f.sum(f.col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()



+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+
| id|vendor_id|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|store_and_fwd_flag|trip_duration|
+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+
|  0|        0|              0|               0|              0|               0|              0|                0|               0|                 0|            0|
+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+



                                                                                

$\Rightarrow$ Tập data không tồn tại giá trị Null.

- Kiểm tra và xóa các dòng trùng lặp.

In [8]:
unique_rows = data.distinct().count()
if unique_rows == data.count():
    print("Không có dòng nào trùng lặp")
else:
    print(f"Có {data.count() - unique_rows} dòng trùng lặp")
    data = data.dropDuplicates()
    print(f"Số lượng dòng sau khi xóa trùng là: {data.count()}")



Không có dòng nào trùng lặp


                                                                                

- Chuyển các cột categorical thành cột số.
    - `pickup_datetime`: tách ra tháng, ngày trong tuần và giờ cụ thể (vì 3 mốc thời gian này đa số sẽ có 1 đặc trưng cụ thể).
    - `store_and_fwd_flag`: chuyển thành numeric bằng LabelEncoder.

In [9]:
data = data.withColumn("pickup_month", f.month("pickup_datetime"))\
            .withColumn("pickup_hour", f.hour("pickup_datetime"))\
            .withColumn("pickup_weekday", f.dayofweek("pickup_datetime"))


In [10]:
indexer = StringIndexer(inputCol="store_and_fwd_flag", outputCol="store_and_fwd_flag_index")
data = indexer.fit(data).transform(data)

                                                                                

- Phân phối các giá trị kết quả.

In [11]:
numeric_col = "trip_duration"
data.select(
    f.count(numeric_col).alias("count"),
    f.mean(numeric_col).alias("mean"),
    f.stddev(numeric_col).alias("stddev"),
    f.min(numeric_col).alias("min"),
    f.max(numeric_col).alias("max"),
    f.percentile_approx(numeric_col, 0.5).alias("median"),
    f.percentile_approx(numeric_col, 0.25).alias("25th_percentile"),
    f.percentile_approx(numeric_col, 0.75).alias("75th_percentile"),
    f.percentile_approx(numeric_col, 0.95).alias("95th_percentile")
).show()



+-------+-----------------+-----------------+---+-------+------+---------------+---------------+---------------+
|  count|             mean|           stddev|min|    max|median|25th_percentile|75th_percentile|95th_percentile|
+-------+-----------------+-----------------+---+-------+------+---------------+---------------+---------------+
|1458644|959.4922729603659|5237.431724497642|  1|3526282|   662|            397|           1075|           2105|
+-------+-----------------+-----------------+---+-------+------+---------------+---------------+---------------+



                                                                                

- Một vài giá trị trip_duration vô lý: 
    - $min = 1s$ (Vì ko thể có chuyến taxi nào chỉ 1s)
    - $max = 3,526,282s \approx 40 ngày$: không thể có chuyến taxi kéo dài hơn cả tháng.
- Ngoài ra, các các giá trị phân vị thứ 1 và thứ 3 cách rất xa giá trị min và max, chứng tỏ có nhiều giá trị cực thấp/cực thấp bất thường.

$\Rightarrow$ Ước lượng khoảng giá trị hợp lý cho 1 cuốc taxi: 60s (1 phút) đến 3,600s (1 giờ) $\rightarrow$ Loại bỏ các giá trị nằm ngoài khoảng.

In [12]:
print(f"Số lượng mẫu ban đầu: {data.count()}")

Số lượng mẫu ban đầu: 1458644


In [13]:
data = data.filter((data.trip_duration >= 60) & (data.trip_duration <= 3600))

In [14]:
print(f"Số lượng mẫu còn lại sau khi loại bỏ outliers: {data.count()}")



Số lượng mẫu còn lại sau khi loại bỏ outliers: 1437732


                                                                                

In [None]:
# Tính hệ số skewness (độ lệch)
skewness_value = data.select(f.skewness("trip_duration")).collect()[0][0]
print(f"Skewness coefficient: {skewness_value}")



Skewness coefficient: 1.564247922314853


                                                                                

Dữ liệu bị lệch phải (skewness > 0) $\Rightarrow$ Em dùng Log Tranformation để giảm outliers.

In [16]:
data = data.withColumn("trip_duration", f.log(f.col("trip_duration")))

In [None]:
skewness_value = data.select(f.skewness("trip_duration")).collect()[0][0]
print(f"Skewness coefficient: {skewness_value}")



Skewness coefficient: -0.22609500524188406


                                                                                

Dữ liệu hơi lệch trái (skewness < 0) nhưng vẫn tạm chấp nhận.

In [18]:
data.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+------------------+------------+-----------+--------------+------------------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|     trip_duration|pickup_month|pickup_hour|pickup_weekday|store_and_fwd_flag_index|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+------------------+------------+-----------+--------------+------------------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|  6.12029741895095|           3|         17|             

### 3. Vector Assembler.

In [19]:
input_cols = ["vendor_id","pickup_longitude", "pickup_latitude", "dropoff_longitude",
              "dropoff_latitude", "pickup_month", "pickup_hour", "pickup_weekday",
              "passenger_count", "store_and_fwd_flag_index"]

assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
assembled_data = assembler.transform(data).select("features", "trip_duration")

### 4. Train-test split:

In [20]:
train, validation = assembled_data.randomSplit([0.8, 0.2], seed=42)

### 5. Model

In [None]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="trip_duration")

evaluator = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="rmse")

paramGrid = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [2, 3, 5, 10])\
    .addGrid(dt.impurity, ["variance"])\
    .build()

crossval = CrossValidator(estimator=dt,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           numFolds=3)  

cv_model = crossval.fit(train)

best_model = cv_model.bestModel
print(f"Best Model Max Depth: {best_model._java_obj.getMaxDepth()}")
print(f"Best Model Impurity: {best_model._java_obj.getImpurity()}")

predictions = best_model.transform(validation)


                                                                                

Best Model Max Depth: 10
Best Model Impurity: variance


In [22]:
# đánh giá mô hình
evaluator_rmse = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
print(f"RMSE: {rmse}")
print(f"R2: {r2}")



RMSE: 0.5137227182074415
R2: 0.4829861483757828


                                                                                

Nhận xét:
- $RMSE = 0.513$: Dự đoán thường sai lệch khoảng $e^{0.513} \approx 1.67$ lần so với thực tế.
- $R^2 = 0.483$: mô hình chỉ giải thích được 48.3% sự biến động của `trip_duration`.

In [23]:
print(best_model.toDebugString)  # In cấu trúc cây
print("Feature Importances:", best_model.featureImportances)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_8525e495816b, depth=10, numNodes=2005, numFeatures=10
  If (feature 1 <= -73.93056869506836)
   If (feature 3 <= -73.92200469970703)
    If (feature 4 <= 40.72033500671387)
     If (feature 2 <= 40.736825942993164)
      If (feature 4 <= 40.687726974487305)
       If (feature 2 <= 40.70436477661133)
        If (feature 1 <= -74.00333786010742)
         If (feature 3 <= -73.98743057250977)
          If (feature 6 <= 16.5)
           If (feature 6 <= 1.5)
            Predict: 6.713786442230492
           Else (feature 6 > 1.5)
            Predict: 6.224312777689657
          Else (feature 6 > 16.5)
           If (feature 5 <= 3.5)
            Predict: 6.444607111530939
           Else (feature 5 > 3.5)
            Predict: 6.788493069687817
         Else (feature 3 > -73.98743057250977)
          If (feature 3 <= -73.9724235534668)
           If (feature 1 <= -74.00604248046875)
            Predict: 6.988423264649287
           Else 

Tên các feature:

| feature | name |
|---|---|
| 0 | vendor_id | 
| 1 | pickup_longitude |
| 2 | pickup_latitude | 
| 3 | dropoff_longitude | 
| 4 | dropoff_latitude | 
| 5 | pickup_month |
| 6 | pickup_hour | 
| 7 | pickup_weekday | 
| 8 | passenger_count | 
| 9 | store_and_fwd_flag_index|

- Các feature đặc trưng là: `pickup_longitude`, `pickup_latitude`, `dropoff_longitude`, `dropoff_latitude`

### 6. Dự đoán kết quả và in ra file cho tập test:

- Đọc tập test

In [24]:
test = spark.read.csv("nyc-taxi-trip-duration/test/test.csv", header=True, inferSchema=True)
test.show(5)

[Stage 247:>                                                        (0 + 1) / 1]

+---------+---------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+
|id3004672|        1|2016-06-30 23:59:58|              1|-73.98812866210938|40.73202896118164|-73.99017333984375| 40.75667953491211|                 N|
|id3505355|        1|2016-06-30 23:59:53|              1|-73.96420288085938|40.67999267578125|-73.95980834960938| 40.65540313720703|                 N|
|id1217141|        1|2016-06-30 23:59:47|              1| -73.9974365234375|40.73758316040039|-73.98616027832031|40.729522705078125|                 N|
|id2150126|        2|2016-06-30 23:59:41|              1|-73.95606994628906|40.771900177

                                                                                

- Biến đổi tập test để áp dụng mô hình.

In [None]:
test = test.withColumn("pickup_month", f.month("pickup_datetime"))\
            .withColumn("pickup_hour", f.hour("pickup_datetime"))\
            .withColumn("pickup_weekday", f.dayofweek("pickup_datetime"))

indexer = StringIndexer(inputCol="store_and_fwd_flag", outputCol="store_and_fwd_flag_index")
test = indexer.fit(test).transform(test)

input_cols = ["vendor_id","pickup_longitude", "pickup_latitude", "dropoff_longitude",
              "dropoff_latitude", "pickup_month", "pickup_hour", "pickup_weekday",
              "passenger_count", "store_and_fwd_flag_index"]

assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
assembled_test = assembler.transform(test).select("id","features")


# Dự đoán trên tập test
predictions_test = best_model.transform(assembled_test)
predictions_test = predictions_test.withColumn("pred_trip_duration", f.exp("prediction"))


                                                                                

- In ra file output

In [26]:
predictions_test.show(5)

+---------+--------------------+------------------+------------------+
|       id|            features|        prediction|pred_trip_duration|
+---------+--------------------+------------------+------------------+
|id3004672|[1.0,-73.98812866...| 6.850411234136562| 944.2691425437013|
|id3505355|[1.0,-73.96420288...|6.2921268681627165| 540.3012574485131|
|id1217141|[1.0,-73.99743652...| 6.157933465798907|472.45072973278855|
|id2150126|[2.0,-73.95606994...|6.9822437341096295|1077.3329057794135|
|id1598245|[1.0,-73.97021484...| 6.315138111790941| 552.8784143467475|
+---------+--------------------+------------------+------------------+
only showing top 5 rows



In [27]:
predictions_test.select("id", "pred_trip_duration").write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("output/decision_tree_predictions_api.csv")


                                                                                