In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, ln, hour,abs,col

from pyspark.ml.feature import Tokenizer, RegexTokenizer, NGram, HashingTF, ChiSqSelector, VectorAssembler

from pyspark.ml.classification import LogisticRegression

In [2]:
spark = SparkSession.builder.appName("NYC").master("local[2]").getOrCreate()
sc = spark.sparkContext

25/02/05 15:16:19 WARN Utils: Your hostname, Globals-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.20.10.3 instead (on interface en0)
25/02/05 15:16:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/05 15:16:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
train = spark.read.csv("train80.csv",header=True,inferSchema=True)\
                  .withColumn("ln_td", ln("trip_duration"))\
                  .withColumn("hh", hour("pickup_datetime"))\
                  .withColumn("l1_distance", abs(col("pickup_longitude")-col("dropoff_longitude")) + abs(col("pickup_latitude")-col("dropoff_latitude")) )\
                  .repartition(8)

train.show(3)

train.registerTempTable("train")



+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-----------------+---+--------------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|            ln_td| hh|         l1_distance|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-----------------+---+--------------------+
|id2556363|        1|2016-03-26 13:11:26|2016-03-26 13:15:38|              1|-73.97377014160156|40.753997802734375|-73.97081756591797| 40.74872970581055|                 N|          252|5.529429087511423| 13|0.008220672607421875|
|id1021173|        2|2016-03-05 21:26:54|2016-03-05 21:38:38|              1| -7

In [4]:
spark.sql("select mean(trip_duration),count(1) as med_duration from train").show()

[Stage 5:>                                                          (0 + 2) / 2]

+-------------------+------------+
|mean(trip_duration)|med_duration|
+-------------------+------------+
|   960.065212976095|     1166915|
+-------------------+------------+



                                                                                

In [7]:
test = spark.read.csv("test20.csv",header=True,inferSchema=True)\
                 .withColumn("hh", hour("pickup_datetime"))\
                 .withColumn("l1_distance", abs(col("pickup_longitude")-col("dropoff_longitude")) + abs(col("pickup_latitude")-col("dropoff_latitude")) )\
                 .repartition(8)

test.show(3)

test.registerTempTable("test")



+---------+---------+-------------------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+-------------+---+--------------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration| hh|         l1_distance|
+---------+---------+-------------------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+-------------+---+--------------------+
|id1375364|        2|2016-02-25 08:03:18|2016-02-25 08:15:13|              2|-73.97305297851562|40.74432373046875|  -73.968505859375| 40.76462173461914|                 N|          715|  8|0.024845123291015625|
|id3596185|        1|2016-05-30 18:29:15|2016-05-30 18:30:48|              1|-73.95536041259767|40.77701568603515|-73.95762634277344| 40.78302764892578|    

                                                                                

In [6]:
spark.sql("select avg(power(trip_duration - 960,2)) as error from test").show()

+-------------------+
|              error|
+-------------------+
|1.030098445489977E7|
+-------------------+



### Some Categorical Things

In [18]:
spark.sql("""
select vendor_id, hour(pickup_datetime) hh, mean(trip_duration) avg_dur, count(1) n 
from train 
group by 1,2
""").registerTempTable("summary_stats")



In [29]:
spark.sql("""
select a.trip_duration,b.avg_dur 
from test a 
join summary_stats b on (a.vendor_id = b.vendor_id and hour(a.pickup_datetime) = b.hh) 
order by abs(a.trip_duration - avg_dur) desc
""").show()

                                                                                

+-------------+------------------+
|trip_duration|           avg_dur|
+-------------+------------------+
|        86346| 766.5796019900497|
|        86286| 910.2591969020962|
|        86365| 989.7060029665823|
|        86357| 981.7420898329461|
|        86353| 981.7420898329461|
|        86348|  983.554221801336|
|        86392|1029.2058158856762|
|        86332| 984.1358433416688|
|        86332| 984.1358433416688|
|        86331| 984.1358433416688|
|        86334| 989.6070192782995|
|        86367|1024.2281807901861|
|        86323|  983.554221801336|
|        86317| 984.1358433416688|
|        86312|  983.554221801336|
|        86346|1024.2281807901861|
|        86303| 981.7420898329461|
|        86301|  983.554221801336|
|        86307| 989.7060029665823|
|        86304| 989.6070192782995|
+-------------+------------------+
only showing top 20 rows



In [30]:
spark.sql("""
select avg(pow(a.trip_duration - b.avg_dur ,2))
from test a 
join summary_stats b on (a.vendor_id = b.vendor_id and hour(a.pickup_datetime) = b.hh) 
""").show()

                                                                                

+--------------------------------------+
|avg(pow((trip_duration - avg_dur), 2))|
+--------------------------------------+
|                   1.028232263265131E7|
+--------------------------------------+



In [9]:
from pyspark.ml.regression import LinearRegression,RandomForestRegressor
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.pipeline import Pipeline

In [10]:
ppl = Pipeline(stages=[StringIndexer(inputCol="vendor_id", outputCol="vendor_id_idx")
                       ,VectorAssembler(inputCols=["l1_distance", "vendor_id_idx","hh","pickup_longitude","pickup_latitude","dropoff_longitude","dropoff_latitude"], outputCol="features"),
                     RandomForestRegressor(labelCol="ln_td")])

In [11]:
ppl_model = ppl.fit(train)

                                                                                

In [12]:
ppl_model.transform(test).select("trip_duration", "prediction").registerTempTable("first_ol")



In [17]:
spark.sql("select trip_duration,exp(prediction) from  first_ol order by abs(trip_duration-exp(prediction)) desc ").show(10)

[Stage 71:>                                                         (0 + 2) / 2]

+-------------+------------------+
|trip_duration|   EXP(prediction)|
+-------------+------------------+
|        86332|251.36924985036353|
|        86286|216.77997786638704|
|        86321|263.46536359092715|
|        86356| 305.4719633978441|
|        86285|241.26000668173543|
|        86325|281.78016688928653|
|        86339|  300.883486631336|
|        86236|204.28203221361485|
|        86241|212.00421859855277|
|        86334|309.08883002425114|
+-------------+------------------+
only showing top 10 rows



                                                                                

In [20]:
spark.sql("select sqrt(avg(power(trip_duration-exp(prediction),2))) from  first_ol ").show()

+------------------------------------------------------+
|SQRT(avg(POWER((trip_duration - EXP(prediction)), 2)))|
+------------------------------------------------------+
|                                     3170.175473915946|
+------------------------------------------------------+

