In [1]:
! pip install -U scikit-learn

Requirement already up-to-date: scikit-learn in /opt/conda/lib/python3.6/site-packages
[33mYou are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
! pip install lightning-python

[33mYou are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [3]:
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.functions import when
from pyspark import SparkContext as sc
from pyspark.sql.functions import col, split, ltrim, substring
import pyspark.sql as SQL
from pyspark.sql.functions import *
import datetime
import calendar
import pandas as pd
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import numpy as np
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.feature import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [4]:
conf = SparkConf().setAppName("Jan-01").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [5]:
spark = SparkSession.builder.appName('Jan-01').getOrCreate()

In [6]:
# Download and decompress data into your Jupyter environment; abreviated jan 2017 data
jan_2017 = spark.read.format("csv").load('yellow_tripdata_half.csv', header = True)
#jan_2017.count()

In [7]:
#need to get two dataframes to merge on, or else get cartesian product error
taxi_zone = spark.read.format("csv").load('taxi+_zone_lookup.csv', header = True)

In [8]:
#merging to get destination information
jan_2017 = jan_2017.join(taxi_zone, jan_2017.PULocationID == taxi_zone.LocationID, "left_outer"). \
                withColumnRenamed("Borough", "PUBorough").withColumnRenamed("Zone", "PUZone").withColumnRenamed("service_zone", "PUServiceZone").\
                withColumnRenamed("neighborhood", "PUneighbor").cache()
    

In [9]:
#make unique ID
jan_2017 = jan_2017.withColumn("uniqueIdColumn", monotonically_increasing_id())

In [10]:
jan_2017 = jan_2017.drop("LocationID")

In [11]:
#encoding if pickup is an aiport
jan_2017 = jan_2017.withColumn("AirportPU", \
                               F.when((jan_2017["PULocationID"] == '138' ) | \
                                      (jan_2017["PULocationID"] == '132') |\
                                      (jan_2017["PULocationID"] == '1'),1).otherwise(0))

In [12]:
#cleaning data
jan_2017 = jan_2017.where((jan_2017['PUBorough'] != 'Unknown'))

In [13]:
#splitting up time and date
split_pickup_col = split(jan_2017['tpep_pickup_datetime'], ' ')
jan_2017 = jan_2017.withColumn("PUDate", split_pickup_col.getItem(0).cast(DateType()))
jan_2017 = jan_2017.withColumn("PUTime", split_pickup_col.getItem(1))

In [14]:
#splitting time into hour and minute; will round minute to nearest 5 minutes
split_PUTime = split(jan_2017['PUTime'], ':')
jan_2017 = jan_2017.withColumn("PUHour", split_PUTime.getItem(0).cast(IntegerType()))
jan_2017 = jan_2017.withColumn("PUMinute", split_PUTime.getItem(1).cast(IntegerType()))

In [15]:
#rush hour
jan_2017 = jan_2017.withColumn("MorningRushHour", \
                               F.when((jan_2017["PUHour"] >= 6 ) & \
                                      (jan_2017["PUHour"] < 9),1).otherwise(0))

In [16]:
jan_2017 = jan_2017.withColumn("EveningRushHour", \
                               F.when((jan_2017["PUHour"] >= 17 ) & \
                                      (jan_2017["PUHour"] < 21),1).otherwise(0))

In [17]:
jan_2017 = jan_2017.withColumn("PUDay", dayofyear(jan_2017.PUDate))

In [18]:
#rounding down mintue to closest 5 minute mark (computationally easier)
#jan_2017 = jan_2017.withColumn("DOMinute", (jan_2017.DOMinute - jan_2017.DOMinute%5))
jan_2017 = jan_2017.withColumn("PUMinute", (jan_2017.PUMinute - jan_2017.PUMinute%5))

In [19]:
#DOW gives you 1 (Monday) - 7 (Sunday)
jan_2017 = jan_2017.withColumn("PU_DOW",  date_format(jan_2017.PUDate, 'u').cast(ShortType()))
#jan_2017 = jan_2017.withColumn("DO_DOW",  date_format(jan_2017.DODate, 'u').cast(ShortType()))

In [20]:
#encoding if destination is a weekend
jan_2017 = jan_2017.withColumn("Weekend", \
                               F.when((jan_2017["PU_DOW"] == 7) | \
                                      (jan_2017["PU_DOW"] == 6),1).otherwise(0))

In [21]:
jan_2017 = jan_2017.withColumn("WorkingHour", \
                               F.when((((jan_2017["PUHour"] >= 9 ) & (jan_2017["PUHour"] < 17))\
                                       & (jan_2017["Weekend"] == 0)) ,1).otherwise(0))

In [22]:
#casting data types to primitives

#1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.
jan_2017 = jan_2017.withColumn("VendorID", jan_2017["VendorID"].cast(ShortType()))

jan_2017 = jan_2017.withColumn("passenger_count", jan_2017["passenger_count"].cast(ShortType()))

#in miles
jan_2017 = jan_2017.withColumn("trip_distance", jan_2017["trip_distance"].cast(FloatType()))

#1= Credit card
#2= Cash
#3= No charge
#4= Dispute
#5= Unknown
#6= Voided trip
jan_2017 = jan_2017.withColumn("payment_type", jan_2017["payment_type"].cast(ShortType()))
jan_2017 = jan_2017.withColumn("fare_amount", jan_2017["fare_amount"].cast(FloatType()))

#0.50 and $1 rush hour and overnight charges.
jan_2017 = jan_2017.withColumn("extra", jan_2017["extra"].cast(FloatType()))
#.50, automatic MTA charge
jan_2017 = jan_2017.withColumn("mta_tax", jan_2017["mta_tax"].cast(FloatType()))


jan_2017 = jan_2017.withColumn("tip_amount", jan_2017["tip_amount"].cast(FloatType()))
jan_2017 = jan_2017.withColumn("tolls_amount", jan_2017["tolls_amount"].cast(FloatType()))
jan_2017 = jan_2017.withColumn("improvement_surcharge", jan_2017["improvement_surcharge"].cast(FloatType()))
jan_2017 = jan_2017.withColumn("total_amount", jan_2017["total_amount"].cast(FloatType()))


jan_2017 = jan_2017.withColumn("RateCodeID", jan_2017["RateCodeID"].cast(ShortType()))
#1= Standard rate
#2=JFK -> $52 flat fare
#3=Newark
#4=Nassau or Westchester
#5=Negotiated fare
#6=Group ride

In [23]:
#basic fare cleaning, ensure that all values are above zero
jan_2017 = jan_2017.filter(jan_2017.tip_amount >= 0)

In [24]:
#basic fare cleaning, ensure that all values are above zero
jan_2017 = jan_2017.filter(jan_2017.tolls_amount >= 0.0) 

In [25]:
#basic fare cleaning, ensure that all values are above zero
jan_2017 = jan_2017.filter(jan_2017.total_amount >= 3.30)

In [26]:
#basic fare cleaning, ensure that all values are above zero
jan_2017 = jan_2017.filter(jan_2017.extra >= 0.00)

In [27]:
#minimum fare amounts according to NYC Taxi data standards
jan_2017 = jan_2017.filter((jan_2017.fare_amount >= 2.50))

In [28]:
#minimum fare amounts according to NYC Taxi data standards
jan_2017 = jan_2017.filter(jan_2017.improvement_surcharge >= 0.3)

In [29]:
#minimum fare amounts according to NYC Taxi data standards
jan_2017 = jan_2017.filter(jan_2017.mta_tax >= 0.5)

In [30]:
jan_2017 = jan_2017.withColumn("PLocationID", jan_2017.PULocationID.cast(IntegerType())).drop("PULocationID")
jan_2017 = jan_2017.withColumn("DLocationID", jan_2017.DOLocationID.cast(IntegerType())).drop("DOLocationID")
#sampe.printSchema()


In [31]:
# jan_2017.printSchema()
jan_2017 = jan_2017.drop('tpep_pickup_datetime')
jan_2017 = jan_2017.drop('tpep_dropoff_datetime')

In [32]:
jan_2017 = jan_2017.drop('payment_type')
jan_2017 = jan_2017.drop('fare_amount')
jan_2017 = jan_2017.drop('extra')
jan_2017 = jan_2017.drop('mta_tax')
jan_2017 = jan_2017.drop('tip_amount')
jan_2017 = jan_2017.drop('tolls_amount')
jan_2017 = jan_2017.drop('improvement_surcharge')
jan_2017 = jan_2017.drop('total_amount')

In [33]:
jan_2017 = jan_2017.drop('trip_distance')
jan_2017 = jan_2017.drop('store_and_fwd_flag')

In [34]:
#load weather data for merging
weather_data = spark.read.load('weather.txt', format="text")

In [35]:
weather_data.createOrReplaceTempView('weather_data_sdf')

In [36]:
weather_data = spark.sql('SELECT CAST(split(value, ",")[0] as string) AS date, '\
                        'CAST(split(value, ",")[1] as string) as time, '\
                        'CAST(split(value, ",")[2] as float) as temp, '\
                        'CAST(split(value, ",")[3] as float) as windchill, '\
                        'CAST(split(value, ",")[4] as float) as dewpoint, '\
                        'CAST(split(value, ",")[5] as float) as humidity, '\
                        'CAST(split(value, ",")[6] as float) as pressure, '\
                        'CAST(split(value, ",")[7] as float) as visibility, '\
                        'CAST(split(value, ",")[8] as string) as windDir, '\
                        'CAST(split(value, ",")[9] as float) as windSpeed, '\
                        'CAST(split(value, ",")[10] as float) as gustSpeed, '\
                        'CAST(split(value, ",")[11] as float) as Precip, '\
                        'CAST(split(value, ",")[12] as string) as Events, '\
                        'CAST(split(value, ",")[13] as string) as Conditions '\
                         'FROM weather_data_sdf')

In [37]:
#cast date to date type
weather_data = weather_data.withColumn("date", weather_data.date.cast(DateType()))

In [38]:
#data to change
def period(x):
    return split(split(x, ':')[1], " ")[1]

In [39]:
#data to change
def toHour(x):
    first_split = split(x, ':')
    retval = first_split[0].cast(IntegerType()) % 12
    return retval 

In [40]:
#get am or pm
weather_data = weather_data.withColumn("period", period("time"))

In [41]:
#make hour military time
weather_data = weather_data.withColumn("hour", when(weather_data.period == 'PM', toHour("time") + 12).otherwise(toHour("time")))

In [42]:
# #fill any nulls
weather_data = weather_data.na.fill(0)

In [46]:
#data to join
jan_2017 = jan_2017.join(weather_data, (jan_2017.PUDate == weather_data.date) & \
                         (jan_2017.PUHour == weather_data.hour), "left_outer")

In [47]:
jan_2017 = jan_2017.dropDuplicates(['uniqueIdColumn'])

In [48]:
#extra, payment type, fare amount, mta_tax, tip_amount, tollsamount, total_amount, improvement surcharge

# Categorical Features
# RateCodeID
# store_and_fwd_flag
# PULocationID
# DOLocationID
# LocationID (1 to 256)
# PUBorough (comes from taxi+_lookup_zone)
# PUZone (Name for Location ID)
# PUServiceZone (Categorical)
# PUNeighbor (Demographics Neighborhood)
# PUDay (1-365)
# PU_DOW (Day of week)
# PUEvents
# PUConditions
# PUPeriod (AM or PM)

jan_2017 = jan_2017.drop('PUDate')
jan_2017 = jan_2017.drop('PUTime')
jan_2017 = jan_2017.drop('date')
jan_2017 = jan_2017.drop('time')

In [49]:
jan_2017 = jan_2017.drop('hour')


In [50]:
PUdemographics = spark.read.format("csv").load('demographics.csv', header = True).cache()


In [51]:
PUnames = PUdemographics.schema.names
i = 0
for name in PUnames:
    if (i != 0):
        PUdemographics = PUdemographics.withColumn("PU" + name, col(name).cast(FloatType())).drop(name)
    i += 1

In [52]:
#PUdemographics.printSchema()

In [53]:
# One hot encoding categorical variables


In [54]:
jan_2017 = jan_2017.join(PUdemographics, jan_2017.PUneighbor == PUdemographics.neighborhood, "left_outer")
jan_2017 = jan_2017.dropDuplicates(['uniqueIdColumn'])
jan_2017 = jan_2017.drop('neighborhood')

In [56]:
# Categorical Features
# RateCodeID
# store_and_fwd_flag
# PULocationID
# DOLocationID
# LocationID (1 to 256)
# PUBorough (comes from taxi+_lookup_zone)
# PUZone (Name for Location ID)
# PUServiceZone (Categorical)
# PUNeighbor (Demographics Neighborhood)
# PUDay (1-365)
# PU_DOW (Day of week)
# PUEvents
# PUConditions
# PUPeriod (AM or PM)
#jan_2017.printSchema()

root
 |-- VendorID: short (nullable = true)
 |-- passenger_count: short (nullable = true)
 |-- RateCodeID: short (nullable = true)
 |-- PUBorough: string (nullable = true)
 |-- PUZone: string (nullable = true)
 |-- PUServiceZone: string (nullable = true)
 |-- PUneighbor: string (nullable = true)
 |-- uniqueIdColumn: long (nullable = false)
 |-- AirportPU: integer (nullable = false)
 |-- PUHour: integer (nullable = true)
 |-- PUMinute: integer (nullable = true)
 |-- MorningRushHour: integer (nullable = false)
 |-- EveningRushHour: integer (nullable = false)
 |-- PUDay: integer (nullable = true)
 |-- PU_DOW: short (nullable = true)
 |-- Weekend: integer (nullable = false)
 |-- WorkingHour: integer (nullable = false)
 |-- PLocationID: integer (nullable = true)
 |-- DLocationID: integer (nullable = true)
 |-- temp: float (nullable = true)
 |-- windchill: float (nullable = true)
 |-- dewpoint: float (nullable = true)
 |-- humidity: float (nullable = true)
 |-- pressure: float (nullable = tr

In [58]:
jan_2017.count()

4925213

In [59]:
#dropping only 0.004527722963% of the data
jan_2017 = jan_2017.na.drop().cache()

In [60]:
# jan_2017.show(10)

In [61]:
indexer = StringIndexer(inputCol="PUZone", outputCol="PUZoneIndex")
jan_2017 = indexer.fit(jan_2017).transform(jan_2017)
encoder = OneHotEncoder(inputCol='PUZoneIndex', outputCol="PUZoneVect")
jan_2017 = encoder.transform(jan_2017).drop('PUZoneIndex')

In [62]:
encoder = OneHotEncoder(inputCol='RateCodeID', outputCol="RateCodeIDVect")
jan_2017 = encoder.transform(jan_2017).drop('RateCodeID')

In [64]:
encoder = OneHotEncoder(inputCol='PLocationID', outputCol="PLocationIDVect")
jan_2017 = encoder.transform(jan_2017).drop('PLocationID')

In [65]:
indexer = StringIndexer(inputCol="PUBorough", outputCol="PUBoroughIndex")
jan_2017 = indexer.fit(jan_2017).transform(jan_2017)
encoder = OneHotEncoder(inputCol='PUBoroughIndex', outputCol="PUBoroughVect")
jan_2017 = encoder.transform(jan_2017).drop('PUBoroughIndex')

In [66]:
indexer = StringIndexer(inputCol="PUServiceZone", outputCol="PUServiceZoneIndex")
jan_2017 = indexer.fit(jan_2017).transform(jan_2017)
encoder = OneHotEncoder(inputCol='PUServiceZoneIndex', outputCol="PUServiceZoneVect")
jan_2017 = encoder.transform(jan_2017).drop('PUServiceZoneIndex')

In [67]:
encoder = OneHotEncoder(inputCol='PU_DOW', outputCol="PU_DOWVect")
jan_2017 = encoder.transform(jan_2017).drop('PU_DOW')

In [68]:
indexer = StringIndexer(inputCol="Events", outputCol="EventsIndex")
jan_2017 = indexer.fit(jan_2017).transform(jan_2017)
encoder = OneHotEncoder(inputCol='EventsIndex', outputCol="EventsVector")
jan_2017 = encoder.transform(jan_2017).drop('EventsIndex')

In [69]:
indexer = StringIndexer(inputCol="Conditions", outputCol="ConditionsIndex")
jan_2017 = indexer.fit(jan_2017).transform(jan_2017)

encoder = OneHotEncoder(inputCol='ConditionsIndex', outputCol="ConditionsVect")
jan_2017 = encoder.transform(jan_2017).drop('ConditionsIndex')

In [70]:
indexer = StringIndexer(inputCol="period", outputCol="periodIndex")
jan_2017 = indexer.fit(jan_2017).transform(jan_2017)
encoder = OneHotEncoder(inputCol='periodIndex', outputCol="periodVect")
jan_2017 = encoder.transform(jan_2017).drop('periodIndex')

In [71]:
encoder = OneHotEncoder(inputCol='PUDay', outputCol="PUDayVect")
jan_2017 = encoder.transform(jan_2017).drop('PUDay')

In [72]:
# jan_2017.printSchema()

In [73]:
indexer = StringIndexer(inputCol="windDir", outputCol="windDirIndex")
jan_2017 = indexer.fit(jan_2017).transform(jan_2017)
encoder = OneHotEncoder(inputCol='windDirIndex', outputCol="windDirVect")
jan_2017 = encoder.transform(jan_2017).drop('windDirIndex')

In [74]:
# Drop any string columns except 'PUZone'
str_names = ['PUZone','tpep_pickup_datetime','tpep_dropoff_datetime','store_and_fwd_flag','PULocationID',\
             'LocationID','PUBorough','PUServiceZone','PUneighbor','PUTime','PUTemptime',\
             'windDir','Events','Conditions','period','neighborhood','PUDate', 'PUTempdate']
for col in str_names:
    jan_2017=jan_2017.drop(col)

In [75]:
# sampe = jan_2017.sample(False,0.0001,0).cache()
sampe = jan_2017

In [76]:
colum = sampe.schema.names

In [77]:
colum.remove('DLocationID')

In [78]:
assembler = VectorAssembler(
    inputCols=colum,
    outputCol="features")

In [79]:
output = assembler.transform(sampe)

In [80]:
train = output.select(["DLocationID","features"])

In [81]:
train = train.withColumn("label", train["DLocationID"]).drop("DLocationID")

In [82]:
# train.select('features').limit(1).collect()

In [83]:
train = train.select(["label","features"]).cache()

In [94]:
jan_2017.show(1)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:39489)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:39489)

In [None]:
# pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
# model = pca.fit(train)
# result = model.transform(train).select("pcaFeatures")
# result.show(truncate=False)

In [90]:
train.select("label", "features").write.save("training.parquet", format="parquet")

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:39489)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:39489)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:39489)