In [130]:
# Install Spark
!pip install pyspark
#!pip install pyspark[sql]



# Import Library

In [131]:
# import pyspark.sql classes and functions
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col, array_contains, isnan, when, count
from pyspark.sql.functions import lit, concat_ws, concat, collect_list, udf
from pyspark.sql.functions import countDistinct

spark = SparkSession.builder.appName("answers").getOrCreate()

In [132]:
!pwd

/kaggle/working


# Loading Data

### Made the inferSchema option as True, to infer the columns as their values, rather than string

In [133]:
path = "/kaggle/input/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-03.csv"
df = spark.read.option("header",'True').option('delimiter', ',').csv(path, inferSchema = True)
df.printSchema()



root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



                                                                                

In [134]:
'''schema = StructType() \
      .add("longitude",DoubleType(),True) \
      .add("latitude",DoubleType(),True) \
      .add("housing_median_age",DoubleType(),True) \
      .add("total_rooms",DoubleType(),True) \
      .add("total_bedrooms",DoubleType(),True) \
      .add("population",DoubleType(),True) \
      .add("households",DoubleType(),True) \
      .add("median_income",DoubleType(),True) \
      .add("median_house_value",DoubleType(),True) \

df = spark.read.format("csv") \
      .option("header", True) \
      .option('delimiter', ',') \
      .schema(schema) \
      .load("/content/sample_data/california_housing_train.csv")

df.printSchema()''' 

'schema = StructType()       .add("longitude",DoubleType(),True)       .add("latitude",DoubleType(),True)       .add("housing_median_age",DoubleType(),True)       .add("total_rooms",DoubleType(),True)       .add("total_bedrooms",DoubleType(),True)       .add("population",DoubleType(),True)       .add("households",DoubleType(),True)       .add("median_income",DoubleType(),True)       .add("median_house_value",DoubleType(),True) \ndf = spark.read.format("csv")       .option("header", True)       .option(\'delimiter\', \',\')       .schema(schema)       .load("/content/sample_data/california_housing_train.csv")\n\ndf.printSchema()'

### Problem 1, Question 1

In [135]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|          2.5|-73.97674560546875| 40.76515197753906|         1|    

### Rounding off the values of pickup longitude and pickup latitude to 1 decimal places for consising the data during group by operation

In [136]:
from pyspark.sql.functions import round
df = df.withColumn("pickup_longitude_round", round(df.pickup_longitude, 1))
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pickup_longitude_round|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              

In [137]:
df.count()

                                                                                

12210952

In [138]:
df = df.withColumn("pickup_latitude_round", round(df.pickup_latitude, 1))
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pickup_longitude_round|pickup_latitude_round|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|

In [139]:
df.drop("pickup_latitude","pickup_longitude").show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pickup_longitude_round|pickup_latitude_round|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|          2.5|         1|                 N|-7

### Aggregating the trip distance average after group by based on the pickup location

In [140]:
df_new = df.groupBy('pickup_longitude_round','pickup_latitude_round').agg({'trip_distance' : 'mean'})

In [141]:
df_new.count()

                                                                                

267

In [142]:
df_new.show()



+----------------------+---------------------+-------------------+
|pickup_longitude_round|pickup_latitude_round| avg(trip_distance)|
+----------------------+---------------------+-------------------+
|                 -82.7|                 40.7|               1.58|
|                 -74.5|                 41.4|               2.46|
|                 -71.6|                 39.1|              18.01|
|                 -74.0|                 40.9|  2.216109660574413|
|                 -73.8|                 40.8|  5.076268894192522|
|                 -73.1|                 40.9| 10.983333333333334|
|                 -73.6|                 40.7|   3.15390243902439|
|                 -74.7|                 41.7|               1.08|
|                 -73.5|                 40.8| 3.4701612903225807|
|                 -69.5|                 41.2| 1.4666666666666668|
|                 -74.3|                 40.9| 3.7944444444444443|
|                 -73.7|                 40.7|  5.478639618138

                                                                                

In [143]:
df_new.printSchema()

root
 |-- pickup_longitude_round: double (nullable = true)
 |-- pickup_latitude_round: double (nullable = true)
 |-- avg(trip_distance): double (nullable = true)



### Sorting the new dataframe in order to get the required location in the first row of the dataframe

In [144]:
df_new = df_new.sort("avg(trip_distance)", ascending=False)
df_new.show()



+----------------------+---------------------+------------------+
|pickup_longitude_round|pickup_latitude_round|avg(trip_distance)|
+----------------------+---------------------+------------------+
|                 -75.2|                 39.9|             121.6|
|                 -74.6|                 40.0|              94.8|
|                 -72.9|                 41.3|              80.9|
|                 -75.1|                 40.0|             73.79|
|                 -75.6|                 39.7|              72.5|
|                 -75.1|                 40.1|              67.9|
|                 -74.2|                 40.0|             61.67|
|                 -74.3|                 40.2|             53.79|
|                 -72.7|                 40.9|             43.59|
|                 -73.0|                 40.8|             26.55|
|                 -73.3|                 40.9|              24.6|
|                 -74.3|                 40.5|24.591666666666665|
|         

                                                                                

In [145]:
df_new.collect()[0]

                                                                                

Row(pickup_longitude_round=-75.2, pickup_latitude_round=39.9, avg(trip_distance)=121.6)

In [146]:
print(f"Location with maximum average trip distance is lat : {df_new.collect()[0][0]} and long : {df_new.collect()[0][1]}.")

Location with maximum average trip distance is lat : -75.2 and long : 39.9.


# Question 2

In [147]:
import pandas as pd
import time

### Calculating the time taken by PySpark to solve Q1

In [148]:
start_time = time.time()
df_new = df.groupBy('pickup_longitude_round','pickup_latitude_round').agg({'trip_distance' : 'mean'})
df_new = df_new.sort("avg(trip_distance)", ascending=False) 
lat = df_new.collect()[0][0]
long = df_new.collect()[0][1]
max_avg_trip = df_new.collect()[0][2]
end_time = time.time()
time_spark = end_time - start_time
print(f"The time taken by spark is {time_spark}.")

                                                                                

The time taken by spark is 33.551915884017944.


In [149]:
df_pandas = pd.read_csv("/kaggle/input/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-03.csv")
df_pandas

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.50,-73.976746,40.765152,1,N,-74.004265,40.746128,1,9.0,0.5,0.5,2.05,0.00,0.3,12.35
1,1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.90,-73.983482,40.767925,1,N,-74.005943,40.733166,1,11.0,0.5,0.5,3.05,0.00,0.3,15.35
2,2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.782021,40.644810,1,N,-73.974541,40.675770,1,54.5,0.5,0.5,8.00,0.00,0.3,63.80
3,2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.863419,40.769814,1,N,-73.969650,40.757767,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,2,2016-03-01 00:00:00,2016-03-01 00:00:00,5,30.43,-73.971741,40.792183,3,N,-74.177170,40.695053,1,98.0,0.0,0.0,0.00,15.50,0.3,113.80
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
12210947,1,2016-03-31 16:34:36,2016-03-31 16:34:36,1,0.00,0.000000,0.000000,1,Y,0.000000,0.000000,2,19.0,1.0,0.5,0.00,0.00,0.3,20.80
12210948,1,2016-03-26 03:02:32,2016-06-14 18:47:55,1,0.40,-73.977356,40.774471,1,N,-73.982536,40.772408,1,4.0,1.0,0.5,1.70,0.00,0.3,7.50
12210949,1,2016-03-20 08:43:59,2016-06-27 15:05:01,1,20.20,0.000000,0.000000,2,N,-74.008614,40.710987,1,52.0,0.0,0.5,6.00,5.54,0.3,64.34
12210950,1,2016-03-20 08:49:47,2016-06-28 19:11:27,1,15.40,-73.790077,40.647377,1,N,-73.971756,40.578457,1,42.5,1.0,0.5,5.00,0.00,0.3,49.30


In [150]:
df_pandas = df_pandas.round({"pickup_longitude":1, "pickup_latitude":1}) 

In [151]:
df_pandas

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.50,-74.0,40.8,1,N,-74.004265,40.746128,1,9.0,0.5,0.5,2.05,0.00,0.3,12.35
1,1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.90,-74.0,40.8,1,N,-74.005943,40.733166,1,11.0,0.5,0.5,3.05,0.00,0.3,15.35
2,2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.8,40.6,1,N,-73.974541,40.675770,1,54.5,0.5,0.5,8.00,0.00,0.3,63.80
3,2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.9,40.8,1,N,-73.969650,40.757767,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,2,2016-03-01 00:00:00,2016-03-01 00:00:00,5,30.43,-74.0,40.8,3,N,-74.177170,40.695053,1,98.0,0.0,0.0,0.00,15.50,0.3,113.80
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
12210947,1,2016-03-31 16:34:36,2016-03-31 16:34:36,1,0.00,0.0,0.0,1,Y,0.000000,0.000000,2,19.0,1.0,0.5,0.00,0.00,0.3,20.80
12210948,1,2016-03-26 03:02:32,2016-06-14 18:47:55,1,0.40,-74.0,40.8,1,N,-73.982536,40.772408,1,4.0,1.0,0.5,1.70,0.00,0.3,7.50
12210949,1,2016-03-20 08:43:59,2016-06-27 15:05:01,1,20.20,0.0,0.0,2,N,-74.008614,40.710987,1,52.0,0.0,0.5,6.00,5.54,0.3,64.34
12210950,1,2016-03-20 08:49:47,2016-06-28 19:11:27,1,15.40,-73.8,40.6,1,N,-73.971756,40.578457,1,42.5,1.0,0.5,5.00,0.00,0.3,49.30


### Calculating the time taken by pandas to solve Q1

In [157]:
start_time = time.time()
df_pandas = df_pandas.groupby(["pickup_longitude","pickup_latitude"]).mean("trip_distance")
df_pandas = df_pandas.sort_values(by = ["trip_distance"], ascending = False)
#print(df_pandas.head(1))
end_time = time.time()
time_pandas = end_time - start_time
print(f"The time taken by pandas is {time_pandas}.")
print(df_pandas["trip_distance"].head(1))

The time taken by pandas is 0.007148027420043945.
pickup_longitude  pickup_latitude
-75.2             39.9               121.6
Name: trip_distance, dtype: float64


### Difference between the times taken by pandas and spark to find the location with maximum average of trip distances

In [158]:
print(f"The difference in the running times is : {time_pandas - time_spark}.")

The difference in the running times is : -33.5447678565979.


# Question 3

In [154]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pickup_longitude_round|pickup_latitude_round|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|

### Rounding off the dropoff location latitudes and longitudes by 1 decimal places

In [176]:
df_new = df.withColumn("dropoff_latitude_round", round(df.dropoff_latitude, 1))
df_new = df_new.withColumn("dropoff_longitude_round", round(df.dropoff_longitude, 1))
df_new = df_new.drop("dropoff_latitude","dropoff_longitude")
df_new.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+----------------------+-----------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pickup_longitude_round|pickup_latitude_round|dropoff_latitude_round|dropoff_longitude_round|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+----------------------

In [177]:
df_new.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_longitude_round: double (nullable = true)
 |-- pickup_latitude_round: double (nullable = true)
 |-- dropoff_latitude_round: double (nullable = true)
 |-- dropoff_longitude_round: double (nullable = true)

In [178]:
df_new = df_new.groupBy('dropoff_longitude_round','dropoff_latitude_round').agg({"passenger_count" : "sum"})
df_new = df_new.sort("sum(passenger_count)", ascending=False) 
print(df_new.show())
lat = df_new.collect()[0][1]
long = df_new.collect()[0][0]
max_passengers = df_new.collect()[0][2]
print(f"Latitude : {lat}, Longitude : {long}.")

                                                                                

+-----------------------+----------------------+--------------------+
|dropoff_longitude_round|dropoff_latitude_round|sum(passenger_count)|
+-----------------------+----------------------+--------------------+
|                  -74.0|                  40.8|             9881955|
|                  -74.0|                  40.7|             7677615|
|                  -73.9|                  40.8|             1431932|
|                  -73.9|                  40.7|              439594|
|                    0.0|                   0.0|              253902|
|                  -73.8|                  40.6|              168782|
|                  -73.9|                  40.9|              107513|
|                  -73.8|                  40.7|               88494|
|                  -74.0|                  40.6|               85545|
|                  -74.2|                  40.7|               34883|
|                  -73.8|                  40.8|               28928|
|                  -

                                                                                

Latitude : 40.8, Longitude : -74.0.


# Question 4

In [170]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pickup_longitude_round|pickup_latitude_round|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+
|

In [172]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_longitude_round: double (nullable = true)
 |-- pickup_latitude_round: double (nullable = true)



In [179]:
df_new = df.groupBy('pickup_longitude_round','pickup_latitude_round').agg({"passenger_count" : "sum"})
df_new = df_new.sort("sum(passenger_count)", ascending=False) 
print(df_new.show())
lat = df_new.collect()[0][1]
long = df_new.collect()[0][0]
max_passengers = df_new.collect()[0][2]
print(f"Latitude : {lat}, Longitude : {long}.")

                                                                                

+----------------------+---------------------+--------------------+
|pickup_longitude_round|pickup_latitude_round|sum(passenger_count)|
+----------------------+---------------------+--------------------+
|                 -74.0|                 40.8|            10245394|
|                 -74.0|                 40.7|             7959108|
|                 -73.9|                 40.8|             1178204|
|                 -73.8|                 40.6|              431956|
|                   0.0|                  0.0|              267854|
|                 -73.9|                 40.7|              124276|
|                 -73.8|                 40.7|               25401|
|                 -73.9|                 40.9|               10536|
|                 -74.0|                 40.6|                7783|
|                 -74.1|                 40.6|                2360|
|                 -73.9|                 40.6|                1984|
|                 -73.8|                 40.8|  

                                                                                

Latitude : 40.8, Longitude : -74.0.


# Question 5

In [174]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_longitude_round: double (nullable = true)
 |-- pickup_latitude_round: double (nullable = true)



### Extracting the date part from the timestamp columns and creating a new column for it

In [181]:
from pyspark.sql.functions import year, month, day
from pyspark.sql.functions import col, to_date

df_new = df.withColumn('date_only', to_date(col('tpep_pickup_datetime')))
df_new.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+---------------------+----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pickup_longitude_round|pickup_latitude_round| date_only|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------------+--

### Grouping by according to the pickup locations and the date to get the passenger counts for each day. Then sorting the dataframe to get the maximum passenger count in the first row.

In [182]:
df_new = df_new.groupBy('pickup_longitude_round','pickup_latitude_round', 'date_only').agg({"passenger_count" : "sum"})
df_new = df_new.sort("sum(passenger_count)", ascending=False) 
print(df_new.show())
lat = df_new.collect()[0][1]
long = df_new.collect()[0][0]
max_passengers = df_new.collect()[0][2]
print(f"Latitude : {lat}, Longitude : {long}.")

                                                                                

+----------------------+---------------------+----------+--------------------+
|pickup_longitude_round|pickup_latitude_round| date_only|sum(passenger_count)|
+----------------------+---------------------+----------+--------------------+
|                 -74.0|                 40.8|2016-03-03|              369756|
|                 -74.0|                 40.8|2016-03-04|              365495|
|                 -74.0|                 40.8|2016-03-18|              363654|
|                 -74.0|                 40.8|2016-03-05|              360343|
|                 -74.0|                 40.8|2016-03-02|              358613|
|                 -74.0|                 40.8|2016-03-11|              354875|
|                 -74.0|                 40.8|2016-03-12|              350971|
|                 -74.0|                 40.7|2016-03-19|              350797|
|                 -74.0|                 40.8|2016-03-10|              348276|
|                 -74.0|                 40.8|2016-0

                                                                                

Latitude : 40.8, Longitude : -74.0.
