In [48]:
from pyspark.sql import SparkSession

In [49]:
spark = SparkSession.builder.appName('uber').getOrCreate()

In [50]:
data = spark.read.csv('uber.csv', header=True, inferSchema=True)

                                                                                

In [51]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [52]:
data.columns

['_c0',
 'key',
 'fare_amount',
 'pickup_datetime',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'passenger_count']

In [53]:
data=data.drop('_c0')

In [54]:
data.printSchema()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [55]:
data.show()

+-------------------+-----------+-------------------+------------------+------------------+------------------+------------------+---------------+
|                key|fare_amount|    pickup_datetime|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|passenger_count|
+-------------------+-----------+-------------------+------------------+------------------+------------------+------------------+---------------+
|2015-05-07 19:52:06|        7.5|2015-05-07 19:52:06|-73.99981689453125| 40.73835372924805|   -73.99951171875| 40.72321701049805|              1|
|2009-07-17 20:04:56|        7.7|2009-07-17 20:04:56|        -73.994355|         40.728225|         -73.99471|         40.750325|              1|
|2009-08-24 21:45:00|       12.9|2009-08-24 21:45:00|        -74.005043|          40.74077|        -73.962565|         40.772647|              1|
|2009-06-26 08:22:21|        5.3|2009-06-26 08:22:21|        -73.976124|         40.790844|        -73.965316|         40.80

In [56]:
import haversine as hs   
from haversine import Unit

In [57]:
data.count()

200000

In [58]:
data = data.where(data['fare_amount'] > 0).dropna(how = 'all')

In [59]:
data = data.where(data['pickup_longitude']>-90).dropna()

In [60]:
data=data.where(data['pickup_latitude']<90).dropna()

In [61]:
data=data.where(data['dropoff_longitude']>-90).dropna()

In [62]:
data=data.where(data['dropoff_latitude']<90).dropna()

In [63]:
data.where(data['fare_amount']<0).show()

+---+-----------+---------------+----------------+---------------+-----------------+----------------+---------------+
|key|fare_amount|pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+---+-----------+---------------+----------------+---------------+-----------------+----------------+---------------+
+---+-----------+---------------+----------------+---------------+-----------------+----------------+---------------+



In [64]:
data.count()

                                                                                

199964

In [65]:
data = data.dropna()

In [66]:
data.describe().show()



+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|       fare_amount|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|   passenger_count|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|            199964|            199964|            199964|            199964|            199964|            199964|
|   mean|11.362559860774995|-72.50188935852033| 39.91803065904743|-72.51264854851401|39.922574101567264|  1.68449320877758|
| stddev| 9.897192676184629| 10.44917892725174| 6.130102970740913|10.408913309302594|6.1160513816519675|1.3859793491891663|
|    min|              0.01|-89.93333299999999|-74.01551500000001|  -75.458978633981|         -74.01575|                 0|
|    max|             499.0|         40.808425|          48.01876|         40.831932|45.031597999999995|               208|
+-------

                                                                                

In [67]:
import pyspark.sql.functions as F

In [68]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType

@F.udf(returnType=FloatType())
def geodist(plat,plong,dlat,dlong):
    loc1=(plat,plong)
    loc2=(dlat,dlong)
    return hs.haversine(loc1,loc2)

In [69]:
data = data.withColumn('Distance',geodist(data['pickup_latitude'], data['pickup_longitude'], data['dropoff_latitude'], data['dropoff_longitude']))

In [70]:
data.show()

+-------------------+-----------+-------------------+------------------+------------------+------------------+------------------+---------------+---------+
|                key|fare_amount|    pickup_datetime|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|passenger_count| Distance|
+-------------------+-----------+-------------------+------------------+------------------+------------------+------------------+---------------+---------+
|2015-05-07 19:52:06|        7.5|2015-05-07 19:52:06|-73.99981689453125| 40.73835372924805|   -73.99951171875| 40.72321701049805|              1| 1.683325|
|2009-07-17 20:04:56|        7.7|2009-07-17 20:04:56|        -73.994355|         40.728225|         -73.99471|         40.750325|              1|2.4575932|
|2009-08-24 21:45:00|       12.9|2009-08-24 21:45:00|        -74.005043|          40.74077|        -73.962565|         40.772647|              1| 5.036384|
|2009-06-26 08:22:21|        5.3|2009-06-26 08:22:21|        -73

In [71]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [72]:
data.columns

['key',
 'fare_amount',
 'pickup_datetime',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'passenger_count',
 'Distance']

In [73]:
from pyspark.sql.functions import corr

In [74]:
data.describe().show()



+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|       fare_amount|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|   passenger_count|          Distance|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|            199964|            199964|            199964|            199964|            199964|            199964|            199964|
|   mean|11.362559860774995|-72.50188935852033| 39.91803065904743|-72.51264854851401|39.922574101567264|  1.68449320877758|20.458348396336564|
| stddev| 9.897192676184629| 10.44917892725174| 6.130102970740913|10.408913309302594|6.1160513816519675|1.3859793491891663| 378.5777553840456|
|    min|              0.01|-89.93333299999999|-74.01551500000001|  -75.458978633981|         -74.01575|                 0|               0.0|

                                                                                

In [75]:
data.select(corr('pickup_latitude','pickup_longitude')).show()



+---------------------------------------+
|corr(pickup_latitude, pickup_longitude)|
+---------------------------------------+
|                    -0.9790483948285729|
+---------------------------------------+



                                                                                

In [76]:
assembler = VectorAssembler(inputCols=['fare_amount','passenger_count','Distance'], outputCol='features')

In [77]:
output = assembler.transform(data)

In [78]:
output.printSchema()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- Distance: float (nullable = true)
 |-- features: vector (nullable = true)



In [79]:
final_data = output.select('features', 'fare_amount')

In [80]:
train_data, test_data = final_data.randomSplit([0.8, 0.2])

In [81]:
train_data.describe().show()

[Stage 64:>                                                         (0 + 2) / 2]

+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|            160057|
|   mean|11.366114696639096|
| stddev| 9.922048156545006|
|    min|              0.01|
|    max|             499.0|
+-------+------------------+



                                                                                

In [82]:
lr = LinearRegression(labelCol='fare_amount')

In [84]:
lr_model = lr.fit(train_data)

23/08/08 14:53:32 WARN Instrumentation: [af9901e5] regParam is zero, which might cause numerical instability and overfitting.
23/08/08 14:53:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/08/08 14:53:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/08/08 14:53:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [85]:
test_results = lr_model.evaluate(test_data)

                                                                                

In [86]:
test_results.residuals.show()

[Stage 70:>                                                         (0 + 1) / 1]

+--------------------+
|           residuals|
+--------------------+
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
|4.440892098500626...|
+--------------------+
only showing top 20 rows



                                                                                

In [87]:
test_results.rootMeanSquaredError

4.106735603906434e-15

In [88]:
test_results.r2

1.0

In [89]:
unlabeled_data = test_data.select('features')
unlabeled_data.show()

[Stage 71:>                                                         (0 + 1) / 1]

+--------------------+
|            features|
+--------------------+
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|       [2.5,1.0,0.0]|
|[2.5,1.0,8.424406...|
|[2.5,1.0,8.278871...|
|[2.5,1.0,0.001484...|
|[2.5,1.0,0.001919...|
|[2.5,1.0,0.002644...|
|[2.5,1.0,0.004486...|
+--------------------+
only showing top 20 rows



                                                                                

In [91]:
predictions = lr_model.transform(unlabeled_data)

In [92]:
predictions.show()

[Stage 73:>                                                         (0 + 1) / 1]

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|       [2.5,1.0,0.0]|2.4999999999999956|
|[2.5,1.0,8.424406...|2.4999999999999956|
|[2.5,1.0,8.278871...|2.4999999999999956|
|[2.5,1.0,0.001484...|2.4999999999999956|
|[2.5,1.0,0.001919...|2.4999999999999956|
|[2.5,1.0,0.002644...|2.4999999999999956|
|[2.5,1.0,0.004486...|2.4999999999999956|
+--------------------+------------

                                                                                

In [93]:
predictions.describe().show()

[Stage 74:>                                                         (0 + 2) / 2]

+-------+-------------------+
|summary|         prediction|
+-------+-------------------+
|  count|              39907|
|   mean|  11.34830230285393|
| stddev|  9.796980337471217|
|    min|0.10999999999999475|
|    max|  230.0000000000001|
+-------+-------------------+



                                                                                