# Projekt BGD 
## Wykorzystanie Sparka w ML

1. Problem: Predykcja czasu przejazdu taksówką na podstawie danych o godzinie i dniu wyjazdu, kierunku i długości trasy.
2. Dane: https://www.kaggle.com/datasets/kentonnlp/2014-new-york-city-taxi-trips
3. Model: Gradient Boosted Trees Regressor.
4. Ewauacja: R^2, MAE, RMSE. 

### Wykorzystywane moduły

In [194]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

from pyspark.sql.functions import count, unix_timestamp, udf, col, lit, dayofweek, hour, to_date, date_format, atan2

from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

import os

### Zainicjowanie sesji sparkowej

In [2]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

conf = SparkConf().setAppName("Big Data Project") \
             .setMaster("local[*]") \
            .set("spark.executor.memory", "16g") \
            .set("spark.executor.cores", "8") \
            .set("spark.executor.instances", "4") \
            .set("spark.driver.memory", "4g") \
            .set("spark.driver.maxResultSize", "1M") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
spark

### Pobranie danych

In [195]:
import os

path_nyc_data = f'{os.getcwd()}/NYCT/nyc_taxi_data_2014.csv'
df_nyc = spark.read.option('header',True).csv(path_nyc_data)

### Wstepne przetwarzanie i analiza danych 

In [307]:
import os

path_nyc_data = f'{os.getcwd()}/NYCT/nyc_taxi_data_2014.csv'
df_nyc = spark.read.option('header',True).csv(path_nyc_data)


df = df_nyc.select('trip_distance','pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
                   'dropoff_latitude','dropoff_datetime','pickup_datetime')

In [339]:
df = df.withColumn('pickup_longitude', col('pickup_longitude').cast('float'))\
        .withColumn('pickup_latitude', col('pickup_latitude').cast('float'))\
        .withColumn('dropoff_longitude', col('dropoff_longitude').cast('float'))\
        .withColumn('dropoff_latitude', col('dropoff_latitude').cast('float'))
# Funkcja atan2 jest funkcją matematyczną, która oblicza arcustangens z dwóch podanych argumentów.
# W przypadku zastosowań geograficznych, taka funkcja może być wykorzystywana do obliczenia kąta kierunku 
# pomiędzy dwoma punktami na płaszczyźnie.
df = df.withColumn("direction", atan2(col("dropoff_latitude") - col("pickup_latitude"), col("dropoff_longitude") - col("pickup_longitude")))

df = df.withColumn('week_day', dayofweek(col('pickup_datetime')).cast('integer'))
df = df.withColumn('hour', hour(col('pickup_datetime')))

df = df.withColumn('pickup_datetime', unix_timestamp(col('pickup_datetime'), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn('dropoff_datetime', unix_timestamp(col('dropoff_datetime'), "yyyy-MM-dd HH:mm:ss"))

df = df.withColumn('trip_distance', col('trip_distance'))

df = df.withColumn('travel_time', (col('dropoff_datetime')-col('pickup_datetime'))/60)

# vectorassembler potrzebuje na wejsciu double, nie jest kompatybilny z floatem
df = df.select([col(c).cast("double") for c in df.columns]) 

df = df.select('trip_distance', 'direction', 'week_day', 'hour', 'travel_time')

df = df.filter(col("travel_time") > 0)
df = df.na.drop()

df.printSchema()

root
 |-- trip_distance: double (nullable = true)
 |-- direction: double (nullable = true)
 |-- week_day: double (nullable = true)
 |-- hour: double (nullable = true)
 |-- travel_time: double (nullable = true)



In [344]:
df.describe().show()

+-------+------------------+--------------------+------------------+------------------+--------------------+
|summary|     trip_distance|           direction|          week_day|              hour|         travel_time|
+-------+------------------+--------------------+------------------+------------------+--------------------+
|  count|          14957348|            14957348|          14957348|          14957348|            14957348|
|   mean|2.7992152205057192|-0.22357213133880377| 4.127336543884651|13.573190447932348|    12.3708147549507|
| stddev| 3.326338806698982|  1.7269145057393407|1.9477535472357383| 6.435122011311211|  11.447761103243032|
|    min|               0.0| -3.1415907789759636|               1.0|               0.0|0.016666666666666666|
|    max|             100.0|   3.141592653589793|               7.0|              23.0|             19517.3|
+-------+------------------+--------------------+------------------+------------------+--------------------+



### Normalizacja min-max

In [340]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

input_cols = [c for c in df.columns if c != 'travel_time']

assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
assembled_df = assembler.transform(df)

# inicjalizacja scalera jako min-max
scaler = MinMaxScaler(inputCol="features", outputCol="normalized_features")

# dopasowanie scalera do danych
scaler_model = scaler.fit(assembled_df)

# normalizacja 
normalized_df = scaler_model.transform(assembled_df)

### Podzial danych na zb treningowy i testowy

In [341]:
final_df = normalized_df.select('normalized_features', 'travel_time')
train_df, test_df = final_df.randomSplit([0.8,0.2], seed=96)

print('Train dataset: ', train_df.count())
print('Test dataset : ', test_df.count())

Train dataset:  11963986
Test dataset :  2993362


### Modelowanie GBTRegressor (Gradient Boosted Trees Regressor)

In [342]:
from pyspark.ml.regression import GBTRegressor

# obiekt GBTRegressor
gbm = GBTRegressor(featuresCol='normalized_features', labelCol='travel_time')

# dopasowanie modelu do danych treningowych
gbm_model = gbm.fit(train_df)

# predykcja na zb testowym
y_pred = gbm_model.transform(test_df)

# porownanie kilku pierwszych wartosci z predykcjami
y_pred.select('travel_time', 'prediction').show(5)

+------------------+------------------+
|       travel_time|        prediction|
+------------------+------------------+
|1.2833333333333334|3.9726587138471023|
|              4.55|3.7001944230797004|
|1.1166666666666667| 3.157236820521912|
| 4.366666666666666|3.3622604332798227|
|             13.05| 3.927278857335151|
+------------------+------------------+
only showing top 5 rows



### Ewaluacja modelu

In [343]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='travel_time')

print('R2 SCORE : ', evaluator.evaluate(y_pred, {evaluator.metricName: 'r2'}))
print('MAE      : ', evaluator.evaluate(y_pred, {evaluator.metricName: 'mae'}))
print('RMSE     : ', evaluator.evaluate(y_pred, {evaluator.metricName: 'rmse'}))


R2 SCORE :  0.6718378665445541
MAE      :  3.107738758060092
RMSE     :  5.461139887592322


### Wylaczenie sesji sparkowej

In [345]:
spark.stop()

### Wnioski

R^2 mówi o dopasowaniu modelu do danych, im bliżej 1 tym model jest bardziej dopasowany. \
W tym przypadku 67% wariancji atrybucie predykowanym travel_time wyjaniane jest przez model.

MAE (Mean Absolute Error) na poziomie 3.11 daje informację o średnim bezwzględnym błędzie predykcji.

RMSE (Root Mean Squared Error) równy 5.46 to pierwiastek kwadratowe z MSE (błędu średniokwadratowego).

Model ma zadowalającą zdolność predykcyjną, ale istnieje margines poprawy, szczególnie w redukcji błędu RMSE.