In [1]:
import time
from pyspark.sql.types import *
from pyspark.sql.functions import to_date, col, lit, unix_timestamp
from pyspark.ml.feature import StringIndexer, OneHotEncoder, PCA
from pyspark.sql.functions import to_timestamp, date_format, hour, year, month, dayofmonth
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.clustering import KMeans

In [2]:
sc

In [3]:
sc._jsc.sc().getExecutorMemoryStatus().size()

4

# Mongo Code

#### Schema of the Green Taxi Database - data was cleaned in mongo:

```
s3cmd get s3://spark-proj/data/yellow_tripdata_2017-05.csv s3://spark-proj/data/yellow_tripdata_2017-04.csv s3://spark-proj/data/yellow_tripdata_2017-03.csv s3://spark-proj/data/yellow_tripdata_2017-02.csv s3://spark-proj/data/yellow_tripdata_2017-01.csv s3://spark-proj/data/yellow_tripdata_2016-12.csv s3://spark-proj/data/yellow_tripdata_2016-11.csv s3://spark-proj/data/yellow_tripdata_2016-10.csv s3://spark-proj/data/yellow_tripdata_2016-09.csv s3://spark-proj/data/yellow_tripdata_2016-08.csv s3://spark-proj/data/green_tripdata_2017-05.csv s3://spark-proj/data/green_tripdata_2017-04.csv s3://spark-proj/data/green_tripdata_2017-03.csv s3://spark-proj/data/green_tripdata_2017-02.csv s3://spark-proj/data/green_tripdata_2017-01.csv s3://spark-proj/data/green_tripdata_2016-12.csv s3://spark-proj/data/green_tripdata_2016-11.csv s3://spark-proj/data/green_tripdata_2016-10.csv s3://spark-proj/data/green_tripdata_2016-09.csv s3://spark-proj/data/green_tripdata_2016-08.csv

mongoimport -d taxidb -c taxidata --type csv --headerline --file yellow_tripdata_2017-05.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file yellow_tripdata_2017-04.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file yellow_tripdata_2017-03.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file yellow_tripdata_2017-02.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file yellow_tripdata_2017-01.csv

mongoimport -d taxidb -c taxidata --type csv --headerline --file green_tripdata_2017-05.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file green_tripdata_2017-04.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file green_tripdata_2017-03.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file green_tripdata_2017-02.csv
mongoimport -d taxidb -c taxidata --type csv --headerline --file green_tripdata_2017-01.csv



```

### Import Green Taxi Data from Mongo Database ( 1M total)

Our data is currently stored in another EC2 database:

`mongodb://ec2-54-190-6-201.us-west-2.compute.amazonaws.com/taxidb_sm.taxidata_g`

- 10M : mongodb://ec2-35-162-204-59.us-west-2.compute.amazonaws.com/taxidb.taxidata10M_y
- 30M : mongodb://ec2-35-162-204-59.us-west-2.compute.amazonaws.com/taxidb.taxidata30M_g

In [4]:
start = time.time()
green_df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://ec2-35-162-204-59.us-west-2.compute.amazonaws.com/taxidb.taxidata30M_g")\
    .load()
green_df.repartition(24)
print time.time() - start

2.41381812096


In [5]:
green_df.printSchema()

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- color: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- extra: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- trip_type: integer (nullable = true)



### Weather Data ( per day per hour) - weather underground.com

In [6]:
# start = time.time()
# weather_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://ec2-54-190-6-201.us-west-2.compute.amazonaws.com/taxidb.weather_data").load()
# print time.time() - start

In [7]:
def indexStringColumns(df, cols):
    newdf = df
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

In [8]:
# w_df = indexStringColumns(weather_df, ['condition'])

In [9]:
# w_df.printSchema()

### Yellow Taxi Data

In [10]:
start = time.time()
yellow_df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://ec2-35-162-204-59.us-west-2.compute.amazonaws.com/taxidb.taxidata30M_y")\
    .load()
yellow_df.repartition(24)
print time.time() - start

1.37875199318


In [11]:
yellow_df.printSchema()

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- color: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- trip_distance: double (nullable = true)



# CHANGE SAMPLE SIZES

- 0.3 = 3M
- 0.1 = 1M
- 1 = 10M

### Glue Yellow and Green Taxi data together for cleaning

In [12]:
select_cols = [x for x in yellow_df.columns if x!='_id' ]

y_df = yellow_df.select(select_cols)
#y_df = y_df.sample(False,0.05, seed=1)
y_df.cache()

g_df = green_df.select(select_cols)
#g_df = g_df.sample(False,0.05, seed=1)
g_df.cache()

total_df = y_df.unionAll(g_df)
total_df.repartition(24)
total_df.cache()
total_df.show(2)

+------------+------------+----------+--------+-----+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|DOLocationID|PULocationID|RatecodeID|VendorID|color|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|
+------------+------------+----------+--------+-----+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|         144|         142|         1|       1|  1.0|  0.5|       16.0|                  0.3|    0.5|              1|           2|                 N|       0.0|         0.0|        17.3|  2017-05-01 00:18:49| 2017-05-01 00:00:02|          4.1|
|         233|         1

### Feature Engineering - extracting data from dates

- Day of Week
- Day of Month
- Month
- Round to hour

In [13]:
def make_dates(df, colName, newCol):
    return df.withColumn(newCol,to_timestamp(colName, 'yyyy-MM-dd HH:mm:ss')).drop(colName)

def get_dateinfo(df, colName):
    return df.withColumn('dow', date_format(colName,'u').cast(IntegerType()))\
    .withColumn('hour', hour(colName))\
    .withColumn('day', dayofmonth(colName))\
    .withColumn('month', month(colName))\
    .withColumn('year', year(colName))
    
def indexStringColumns(df, cols):
    newdf = df
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

#### Clean the total dataset
- 100k : 73.8127279282
- 500k : 75.45023489
- 1M : 54.37
- 3M : 54.6 Secs
- 10M : 59.06 secs
- 30M : 166.949352026
- 70M :
- 100M :


In [14]:
start = time.time()
timeDiff = (unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime'))
df_dates = total_df.withColumn("Duration", timeDiff)
df_dates = df_dates.filter("Duration < 7200 and Duration > 60" )
df_dates = make_dates(total_df, "tpep_dropoff_datetime", "dropoff_datetime")
df_dates = make_dates(df_dates, "tpep_pickup_datetime", "pickup_datetime")
new_df = indexStringColumns(df_dates, ["store_and_fwd_flag"])
new_df = get_dateinfo(df_dates, 'dropoff_datetime')
print time.time() - start

148.511734962


### Join in the weather data

In [15]:
# joined_df = new_df.join(w_df,[new_df.day==w_df.day, new_df.month==w_df.month, new_df.hour==w_df.hour], 'left_outer')
# joined_df = joined_df.select(new_df.DOLocationID, new_df.PULocationID, new_df.RatecodeID, new_df.VendorID, new_df.color, new_df.extra, new_df.fare_amount, new_df.improvement_surcharge, new_df.mta_tax, new_df.passenger_count, new_df.payment_type, new_df.store_and_fwd_flag, new_df.tip_amount, new_df.tolls_amount, new_df.total_amount, new_df.trip_distance, new_df.dropoff_datetime, new_df.pickup_datetime, new_df.dow, new_df.hour, new_df.day, new_df.month, new_df.year, w_df.temp, w_df.condition)
joined_df = new_df
joined_df = joined_df.drop("dropoff_datetime").drop("pickup_datetime").drop("store_and_fwd_flag").drop("ehail_fee").drop("trip_type")

### One-hot Encode any of the categorical Columns

In [16]:

start = time.time()

def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

onehot_cols = [ x for x in joined_df.columns if 'ID' in x]
onehot_cols = onehot_cols + ['dow','day','month'] # took out weather + condition

joined_df.printSchema()
dfhot = oneHotEncodeColumns(joined_df,onehot_cols)
df_for_model = dfhot.withColumnRenamed("color","label")
df_for_model.printSchema()

print time.time() - start

root
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- color: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- dow: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- label: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- improvement_surcharge: double (n

### Prepare the Vectors for Modeling

In [17]:
va = VectorAssembler(outputCol="features", inputCols=df_for_model.drop("label").columns) #except the last col.
taxi_points = va.transform(df_for_model).select("features", "label")


In [None]:
split_data = taxi_points.randomSplit([0.8, 0.2])
training = split_data[0].cache()
test = split_data[1].cache()

# BENCHMARKING BELOW

### Random Forest Modeling

- 100k : 36.5315189362 | 0.91292 | 0.909347
- 500k : 51.1424059868 | 0.909106 | 0.907606
- 1M : 42.5615952015 | 0.909293 | 0.908242
- 3M : 76.5072159767 | 0.907866 | 0.907026
- 10M : 247.885142088 | 0.910769 | 0.910489
- 30M : 815.310534 | 0.90917 | 0.909082
- 70M :
- 100M :


In [None]:
start = time.time()
rf = RandomForestClassifier(maxDepth=15)
rfmodel = rf.fit(training)
print time.time() - start

In [None]:
start = time.time()
rfpredicts_train = rfmodel.transform(training)
rfpredicts_test = rfmodel.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
train_accuracy = evaluator.evaluate(rfpredicts_train)
test_accuracy = evaluator.evaluate(rfpredicts_test)

print("train / test Accuracy = %g | %g" % (train_accuracy, test_accuracy))
print time.time() - start

#### Feature importance

In [21]:
# import operator

# columns = df_for_model.drop("label").columns
# importances = {}
# for score,col in zip(rfmodel.featureImportances, columns):
#     importances[col] =  score
# sorted_importances = sorted(importances.items(), key=operator.itemgetter(1), reverse = True)
# sorted_importances

### Logistic Regression
- 100K : 14.2321848869 | 0.963656 | 0.960007
- 500K : 16.9678959846 | 0.96247 | 0.962208
- 1M : 12.0236101151 | 0.962448 | 0.961711
- 3M : 19.3087320328 | 0.962437 | 0.962179
- 10M : 37.7062170506 | 0.962361 | 0.962184
- 30M : 357.43987608| 0.961334 | 0.961369
- 70M :
- 100M :


In [44]:
start = time.time()
logreg = LogisticRegression()
log_model = logreg.fit(training)
print time.time() - start


16.9678959846


In [45]:
start = time.time()
logpredicts_test = log_model.transform(test)
logpredicts_train = log_model.transform(training)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
train_accuracy = evaluator.evaluate(logpredicts_train)
test_accuracy = evaluator.evaluate(logpredicts_test)

print("train / test Accuracy = %g | %g" % (train_accuracy, test_accuracy))
print time.time() - start

train / test Accuracy = 0.96247 | 0.962208
1.42913389206


### Kmeans
- 100K: 7.25322890282
- 500K: 13.6290647984
- 1M : 11.3732612133
- 3M : 29.7429788113
- 10M : 84.9775218964
- 30M : 194.982362986
- 70M :
- 100M :

In [22]:
start = time.time()
kmean_df = df_for_model.withColumn("label", lit(0))

va = VectorAssembler(outputCol="features", inputCols=kmean_df.drop("label").columns) #except the last col.
taxi_points_mean = va.transform(kmean_df).select("features", "label")

kmeans = KMeans().setK(10).setFeaturesCol("features").setPredictionCol("prediction")
model = kmeans.fit(taxi_points_mean).transform(taxi_points_mean)

print model.groupBy("prediction").count().show()
print time.time() - start

+----------+--------+
|prediction|   count|
+----------+--------+
|         1|       1|
|         6| 1129493|
|         3|       1|
|         5|       1|
|         9| 9809590|
|         4| 2123725|
|         8| 6654671|
|         7|      10|
|         2|       2|
|         0|14025318|
+----------+--------+

None
194.982362986


### RF Regression
- 100K : 42.839509964 | 0.226576 | 0.214108
- 500K : 99.1277749538 | 0.226455 | 0.220842
- 1M : 106.496699095 | 0.227120 | 0.222687
- 3M : 240.4 | 0.225 | 0.223 
- 10M : 628.436889887 | 0.226972 | 0.225364
- 30M :
- 70M :
- 100M :

In [None]:
start = time.time()

# then do all that stuff
df_travel = df_for_model.withColumnRenamed("Duration", "label")
va = VectorAssembler(outputCol="features", inputCols=df_travel.drop("label").columns) #except the last col.
taxi_points_travel = va.transform(df_travel).select("features", "label")

split_data = taxi_points_travel.randomSplit([0.8, 0.2])
training = split_data[0].cache()
test = split_data[1].cache()

rf_r = RandomForestRegressor(maxDepth=15)
rfmodel = rf_r.fit(training)

print time.time() - start

In [None]:
rfreg_predicts_test = rfmodel.transform(test)
rfreg_predicts_training = rfmodel.transform(training)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
test_rmse = evaluator.evaluate(rfreg_predicts_test)
train_rmse = evaluator.evaluate(rfreg_predicts_training)
print("train / test RMSE = %f | %f" % (test_rmse, train_rmse))
print time.time() - start

### PCA 
- 100k: 3.53512597084
- 500k: 10.2684950829
- 1M : 7.92126703262
- 3M : 21.8753027916
- 10M : 65.5080680847
- 30M :
- 70M :
- 100M :

In [23]:
start = time.time()

pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(taxi_points)
result = model.transform(taxi_points).select("pcaFeatures")

print time.time() - start

148.88379097


## Gradient Regression

- 100k:  218.333948135 | 0.190186 | 0.125398
- 500k: 
- 1M : 
- 3M : 
- 10M :
- 30M :
- 70M :
- 100M :

In [30]:
start = time.time()

# then do all that stuff
df_travel = df_for_model.withColumnRenamed("Duration", "label")
va = VectorAssembler(outputCol="features", inputCols=df_travel.drop("label").columns) #except the last col.
taxi_points_travel = va.transform(df_travel).select("features", "label")

split_data = taxi_points_travel.randomSplit([0.8, 0.2])
training = split_data[0].cache()
test = split_data[1].cache()

gb_r = GBTRegressor(maxDepth=15)
gbmodel = gb_r.fit(training)

print time.time() - start

218.333948135


In [31]:
gbreg_predicts_test = gbmodel.transform(test)
gbreg_predicts_training = gbmodel.transform(training)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
test_rmse = evaluator.evaluate(gbreg_predicts_test)
train_rmse = evaluator.evaluate(gbreg_predicts_training)
print("train / test RMSE = %f | %f" % (test_rmse, train_rmse))
print time.time() - start

train / test RMSE = 0.190186 | 0.125398
220.115098


## Gradient Classification

- 100k: 319.882117987 | 0.979915 | 0.951964
- 500k: 
- 1M : 
- 3M : 
- 10M :
- 30M :
- 70M :
- 100M :

In [32]:
start = time.time()
gbc = GBTClassifier(maxDepth=15)
gbcmodel = gbc.fit(training)
print time.time() - start

319.882117987


In [33]:
start = time.time()
gbcpredicts_train = gbcmodel.transform(training)
gbcpredicts_test = gbcmodel.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
train_accuracy = evaluator.evaluate(gbcpredicts_train)
test_accuracy = evaluator.evaluate(gbcpredicts_test)

print("train / test Accuracy = %g | %g" % (train_accuracy, test_accuracy))
print time.time() - start

train / test Accuracy = 0.979915 | 0.951964
4.33535385132


# KEEP A STANDARD DATASIZE FOR THIS

## Regression Benchmarks (by tree)

- 1M :
- 3M :
- 10M :
- 30M :
- 70M :
- 100M :

In [None]:
# then do all that stuff
df_travel = df_for_model.withColumnRenamed("Duration", "label")
va = VectorAssembler(outputCol="features", inputCols=df_travel.drop("label").columns) #except the last col.
taxi_points_travel = va.transform(df_travel).select("features", "label")

split_data = taxi_points_travel.randomSplit([0.8, 0.2])
training = split_data[0].cache()
test = split_data[1].cache()

for md in [3,6,9,12,15,18]
    start = time.time()

    rf_r = RandomForestRegressor(maxDepth=3)
    rfmodel = rf_r.fit(training)


    print time.time() - start