In [38]:
import findspark
import os 
import sys 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
findspark.init()

In [3]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from urllib.request import urlretrieve as retrive
from pyspark.sql import SQLContext
import requests
from functools import reduce
from pyspark.sql.functions import *

In [13]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles

In [5]:
spark = SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri","mongodb://localhost:27017/trip.trip?readPreference=primaryPreferred").config("spark.mongodb.output.uri","mongodb://localhost:27017/trip.trip").config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").getOrCreate()

In [6]:
url="https://raw.githubusercontent.com/sank03/Yellow-Taxi-Dataset-2019/main/taxi_full_dataset.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
SparkFiles.get("taxi_full_dataset.csv")
df_taxi = spark.read.csv("file:///"+SparkFiles.get("taxi_full_dataset.csv"), header= True).limit(300000)

In [7]:
df_taxi.count()

300000

In [8]:
ml = spark.read.format("mongo").option("uri","mongodb://localhost:27017/YellowTaxi.taxi_bi").load()
df_taxi.write.format("mongo").option("uri","mongodb://localhost:27017/YellowTaxi.taxi_bi").save()

In [9]:
checkNullValues = {col:df_taxi.filter(df_taxi[col].isNull()).count() for col in df_taxi.columns}
print(checkNullValues)

{'No': 1297, 'passenger_count': 0, 'tpep_pickup_datetime': 0, 'tpep_dropoff_datetime': 0, 'VendorID': 0, 'trip_distance': 1297, 'PULocationID6': 1297, 'store_and_fwd_flag': 0, 'DOLocationID': 1297, 'RatecodeID': 0, 'payment_type': 1297, 'fare_amount': 1297, 'tip_amount': 1297, 'tolls_amount': 1297, 'total_amount': 1297, 'date': 1297, 'time': 1297, 'weekday': 1297, 'year': 1297, 'month': 1297, 'day': 1297, 'day_of_week': 1297, 'hour_of_day': 1297, 'trip_duration': 1297, 'tip_percent': 1297, 'calculated_total_amount': 3919, 'TAVG': 1297, 'PUborough': 1297, 'DOborough': 1297, 'PULocationID29': 299735, 'Borough': 299735, 'Zone': 299735, 'service_zone': 299735}


In [10]:
df = df_taxi.drop(*('store_and_fwd_flag','RatecodeID'))
df.printSchema()

root
 |-- No: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- VendorID: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- PULocationID6: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- hour_of_day: string (nullable = true)
 |-- trip_duration: string (nullable = true)
 |-- tip_percent: string (nullable = true)
 |-- calculated_total_amo

In [20]:
#converted datetime into timestamp
df_n = df.withColumn("pickup_timestamp",to_timestamp(col("tpep_pickup_datetime")))
df_new = df_n.withColumn("dropoff_timestamp",to_timestamp(col("tpep_dropoff_datetime")))

In [18]:
#converted datetime into timestamp
df_n = df.withColumn("dropoff_timestamp",to_timestamp(col("tpep_dropoff_datetime")))

DataFrame[No: string, passenger_count: string, tpep_pickup_datetime: string, tpep_dropoff_datetime: string, VendorID: string, trip_distance: string, PULocationID6: string, DOLocationID: string, payment_type: string, fare_amount: string, tip_amount: string, tolls_amount: string, total_amount: string, date: string, time: string, weekday: string, year: string, month: string, day: string, day_of_week: string, hour_of_day: string, trip_duration: string, tip_percent: string, calculated_total_amount: string, TAVG: string, PUborough: string, DOborough: string, PULocationID29: string, Borough: string, Zone: string, service_zone: string, dropoff_timestamp: timestamp]

In [23]:
#converted datetime into timestamp
df_pt = df.withColumn('pickup_timestamp', F.unix_timestamp('tpep_pickup_datetime','dd/MM/yyyy hh:mm:ss a'))
df_dt = df_pt.withColumn('dropoff_timestamp', F.unix_timestamp('tpep_dropoff_datetime','dd/MM/yyyy hh:mm:ss a'))

In [26]:
df_dt.select("pickup_timestamp","dropoff_timestamp")

DataFrame[pickup_timestamp: bigint, dropoff_timestamp: bigint]

In [25]:
#calculate time duration
time_dur = df_dt.withColumn('time_duration', F.round((df_dt['dropoff_timestamp']-df_dt['pickup_timestamp'])/60))

In [27]:
#typecast date into date datatype
df_date=time_dur.withColumn('pickup_date', to_timestamp('pickup_timestamp').cast('date')).withColumn('dropoff_date', to_timestamp('dropoff_timestamp').cast('date'))

In [30]:
#typecast time into date timestamp type
df_time=df_date.withColumn('pickup_time', to_timestamp('pickup_timestamp').cast('timestamp')).withColumn('dropoff_time', to_timestamp('dropoff_timestamp').cast('timestamp'))


In [31]:
#seprate date into day,month and year
pick_DateSep=df_time.withColumn("year",year("pickup_date")).withColumn("month",month("pickup_date")).withColumn("day",dayofmonth("pickup_date"))
drop_DateSep=pick_DateSep.withColumn("dropoff_year",year("dropoff_date")).withColumn("dropoff_month",month("dropoff_date")).withColumn("dropoff_day",dayofmonth("dropoff_date"))

In [32]:
#seprate time into hour,minute and seconds
pick_timeSep=drop_DateSep.withColumn("pickup_Hour",hour("pickup_time")).withColumn("pickup_Min",minute("pickup_time")).withColumn("pickup_Sec",dayofmonth("pickup_time"))
drop_timeSep=pick_timeSep.withColumn("dropoff_Hour",hour("dropoff_time")).withColumn("dropoff_Min",minute("dropoff_time")).withColumn("dropoff_Sec",dayofmonth("dropoff_time"))

In [33]:
#drop column
df_drop=pick_timeSep.drop(*('tpep_pickup_datetime','tpep_dropoff_datetime','pickup_date','dropoff_date','pickup_time','dropoff_time','pickup_timestamp','dropoff_timestamp',))
df_drop.count()

300000

CONNECT TO AND DUMP CLEANED DATA INTO MONGODB








In [37]:
ml1 = spark.read.format("mongo").option("uri","mongodb://localhost:27017/NewYorkTaxi.Taxi_ml").load()
df_drop.write.format("mongo").option("uri","mongodb://localhost:27017/NewYorkTaxi.Taxi_ml").save() 

Import Data from Mongodb

In [None]:
#import pandas as pd
#import numpy as np
#import seaborn as sns
#import matplotlib.pyplot as plt
#import pymongo
#from pymongo import MongoClient
#from sklearn.model_selection import train_test_split
#from sklearn.ensemble import RandomForestClassifier
#from sklearn.metrics import accuracy_score

In [39]:
import findspark 
from pymongo import MongoClient
import pandas as pd 

In [42]:
client = MongoClient('localhost',27017)
db = client.NewYorkTaxi
data = db.Taxi_ml
df = pd.DataFrame(list(data.find().limit(500000)))
df_final = df.drop('_id',axis=1)

In [43]:
df_final

Unnamed: 0,No,passenger_count,VendorID,trip_distance,PULocationID6,DOLocationID,payment_type,fare_amount,tip_amount,tolls_amount,...,trip_duration,tip_percent,calculated_total_amount,TAVG,PUborough,DOborough,PULocationID29,Borough,Zone,service_zone
0,0,1,2,0.73,163,161,2,4.5,0,0,...,152,0,5.8,5.15,Manhattan,Manhattan,1,EWR,Newark Airport,EWR
1,1,1,1,1.1,163,161,1,5.5,2.3,0,...,67,19.82758621,6.26,5.15,Manhattan,Manhattan,2,Queens,Jamaica Bay,Boro Zone
2,2,5,2,0.64,163,161,1,4,2.19,0,...,137,23.07692308,6.26,5.15,Manhattan,Manhattan,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,3,6,2,0.25,163,161,2,5.5,0,0,...,145,0,8.75,5.15,Manhattan,Manhattan,4,Manhattan,Alphabet City,Yellow Zone
4,4,6,2,0.4,163,161,1,4.5,1.2,0,...,173,13.33333333,9.3,5.15,Manhattan,Manhattan,5,Staten Island,Arden Heights,Boro Zone
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
299995,,1,1,,,,,,,,...,,,,,,,,,,
299996,,2,2,,,,,,,,...,,,,,,,,,,
299997,,1,2,,,,,,,,...,,,,,,,,,,
299998,,1,2,,,,,,,,...,,,,,,,,,,
