In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ml_taxi").getOrCreate()

23/02/09 19:02:51 WARN Utils: Your hostname, imhaneul-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.75 instead (on interface en0)
23/02/09 19:02:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/09 19:02:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import os

trip_file = f"{os.getcwd()}/data/trips"
trip_directory = spark.read.parquet(f"{trip_file}/*")
trip_directory.createOrReplaceTempView("trips")

                                                                                

In [3]:
trip_directory.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [4]:
qs = """
SELECT 
    YEAR(pickup_date) as year,
    count(*) as y
FROM 
    (
        SELECT 
            split(tpep_pickup_datetime, " ")[0] as pickup_date
        FROM 
            trips
    )
GROUP BY 
    year
ORDER BY 
    year
"""
spark.sql(qs).show()



+----+--------+
|year|       y|
+----+--------+
|2003|       2|
|2004|       1|
|2009|     137|
|2020|       1|
|2021|15000794|
|2029|       1|
+----+--------+



                                                                                

In [5]:
qs = """
SELECT 
    passenger_count, 
    tpep_pickup_datetime as pickup_datetime,
    trip_distance,
    total_amount
FROM 
    trips
WHERE 
    TO_DATE(tpep_pickup_datetime) >= "2021-01-01"
    AND TO_DATE(tpep_pickup_datetime) < "2021-08-01" 
"""
spark.sql(qs).describe().show()



+-------+------------------+-----------------+------------------+
|summary|   passenger_count|    trip_distance|      total_amount|
+-------+------------------+-----------------+------------------+
|  count|          14119430|         14951326|          14951326|
|   mean|1.4249466161169395|6.622507772889028|18.751955821150126|
| stddev|1.0441240925988922|671.6468121191299|145.98221322318227|
|    min|               0.0|              0.0|            -647.8|
|    max|               9.0|        332541.19|          398469.2|
+-------+------------------+-----------------+------------------+



                                                                                

In [6]:
qs = """
SELECT 
    HOUR(tpep_pickup_datetime) as pickup_datetime,
    DATE_FORMAT(tpep_pickup_datetime, "EEEE") as day_of_week,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    passenger_count, 
    trip_distance,
    total_amount
FROM 
    trips
WHERE
    passenger_count BETWEEN 1 AND 4
    AND trip_distance BETWEEN 3 AND 500
    AND total_amount BETWEEN 1 AND 500
    AND TO_DATE(tpep_pickup_datetime) >= "2021-01-01"
    AND TO_DATE(tpep_pickup_datetime) < "2021-08-01" 
"""
data = spark.sql(qs)
data.createOrReplaceTempView("data")

In [7]:
data.describe().show()



+-------+------------------+-----------+------------------+-------------------+------------------+-----------------+------------------+
|summary|   pickup_datetime|day_of_week|pickup_location_id|dropoff_location_id|   passenger_count|    trip_distance|      total_amount|
+-------+------------------+-----------+------------------+-------------------+------------------+-----------------+------------------+
|  count|           3304180|    3304180|           3304180|            3304180|           3304180|          3304180|           3304180|
|   mean|10.491263490487807|       null|157.06760013074347|  151.8498998238595|1.3031553971030634|7.083868442397225| 32.27237469821481|
| stddev| 8.030924492452023|       null| 64.88764715005858|  77.59881161292482|0.6323994705499705|5.704137940945662|18.072610395017485|
|    min|                 0|     Friday|                 1|                  1|               1.0|              3.0|              1.25|
|    max|                23|  Wednesday|        

                                                                                

In [8]:
train_df, test_df = data.randomSplit([0.8, 0.2], seed=2023)

In [9]:
print(f"train data count => {train_df.count()}")
print(f"test data count => {test_df.count()}")

                                                                                

train data count => 2644102




test data count => 660078


                                                                                

In [10]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categorical_feature = [
    'pickup_location_id',
    'dropoff_location_id',
    'day_of_week'
]
stages = []

for c in categorical_feature:
    categori_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep")
    onehot = OneHotEncoder(inputCols=[categori_indexer.getOutputCol()], outputCols=[c+"_onehot"])
    stages += [categori_indexer, onehot]

In [11]:
stages

[StringIndexer_527ae13b6bd3,
 OneHotEncoder_4370edbcb5e2,
 StringIndexer_e6993219d885,
 OneHotEncoder_554255c400ed,
 StringIndexer_81cf8a1b0afd,
 OneHotEncoder_e0e1ac1bf801]

In [12]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

num = [
    "passenger_count",
    "trip_distance",
    "pickup_datetime"
]
for n in num:
    num_vectorasem = VectorAssembler(inputCols=[n], outputCol=n+"_vector")
    num_scaler = StandardScaler(inputCol=num_vectorasem.getOutputCol(), outputCol=n+"_scaler")
    stages += [num_vectorasem, num_scaler]

In [13]:
stages

[StringIndexer_527ae13b6bd3,
 OneHotEncoder_4370edbcb5e2,
 StringIndexer_e6993219d885,
 OneHotEncoder_554255c400ed,
 StringIndexer_81cf8a1b0afd,
 OneHotEncoder_e0e1ac1bf801,
 VectorAssembler_04039e28de58,
 StandardScaler_bce6db924278,
 VectorAssembler_dfdffa0e0b47,
 StandardScaler_0cc0c36aad9f,
 VectorAssembler_ef20e6efad86,
 StandardScaler_a276e8d460d7]

In [14]:
featrue_assembler = [c+"_onehot" for c in categorical_feature] + [n+"_scaler" for n in num]
assem = VectorAssembler(inputCols=featrue_assembler, outputCol="features_vector")
stages += [assem]

In [15]:
stages

[StringIndexer_527ae13b6bd3,
 OneHotEncoder_4370edbcb5e2,
 StringIndexer_e6993219d885,
 OneHotEncoder_554255c400ed,
 StringIndexer_81cf8a1b0afd,
 OneHotEncoder_e0e1ac1bf801,
 VectorAssembler_04039e28de58,
 StandardScaler_bce6db924278,
 VectorAssembler_dfdffa0e0b47,
 StandardScaler_0cc0c36aad9f,
 VectorAssembler_ef20e6efad86,
 StandardScaler_a276e8d460d7,
 VectorAssembler_787e1d034f75]

In [16]:
from pyspark.ml.pipeline import Pipeline

pipleline = Pipeline(stages=stages)
fitted_tranformer = pipleline.fit(train_df)

                                                                                

In [17]:
vtrain_df = fitted_tranformer.transform(train_df)

In [18]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
  maxIter=50,
  regParam=0.01,
  labelCol="total_amount",
  featuresCol="features_vector"  
)

In [19]:
model = lr.fit(vtrain_df)



23/02/09 19:04:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/02/09 19:04:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/02/09 19:04:22 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

In [20]:
model.summary.r2

0.7542036698485332

In [21]:
model.summary.rootMeanSquaredError

8.944717741486938

In [22]:
vtest_df = fitted_tranformer.transform(test_df)

In [23]:
pred = model.transform(vtest_df)
pred.select("pickup_datetime", "day_of_week", "passenger_count", "trip_distance", "prediction", "total_amount").show()

[Stage 38:>                                                         (0 + 1) / 1]

+---------------+-----------+---------------+-------------+------------------+------------+
|pickup_datetime|day_of_week|passenger_count|trip_distance|        prediction|total_amount|
+---------------+-----------+---------------+-------------+------------------+------------+
|              0|     Friday|            4.0|         8.36|33.225443593531075|       40.56|
|              0|     Friday|            1.0|          6.2| 29.46179441133163|        29.0|
|              0|     Friday|            1.0|         3.32| 23.16769415528741|        17.3|
|              0|     Friday|            1.0|          3.0| 22.44842437838726|        17.3|
|              0|     Friday|            1.0|          3.0| 22.44842437838726|       21.36|
|              0|     Friday|            1.0|         4.44|25.585195881745527|        21.3|
|              0|     Friday|            1.0|          8.5| 35.99321470875731|        28.3|
|              0|     Friday|            1.0|          8.0| 40.93223985937816|  

                                                                                