In [68]:
!pip install findspark
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Object that we use for interacting with Spark, main entry point to spark 
# http://localhost:4040/jobs/ to see the UI



In [5]:
spark.version

'3.0.3'

In [7]:
df_fhv = spark.read \
    .option("header", "true") \
    .csv("fhvhv_tripdata_2021-02.csv")

In [8]:
df_fhv.printSchema()

#all strings, we don't want that. 

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [9]:
#edit the schema in Visual studio code, and convert to python

from pyspark.sql import types

schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [11]:
df_fhv = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("fhvhv_tripdata_2021-02.csv") 

In [12]:
df_fhv.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [13]:
df_fhv.repartition(24).write.parquet("hvfhv_2021-02")

In [18]:
df_fhv.show()


+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

AttributeError: 'NoneType' object has no attribute 'head'

In [23]:
df_fhv.registerTempTable('FHV')

In [32]:
df_fhv = spark.read.parquet("hvfhv_2021-02")

In [57]:
# q3_df = df_fhv.filter( (df_fhv.pickup_datetime >= '2021-02-01 00:00:00') & (df_fhv.pickup_datetime) >= '2021-02-28 00:00:00').show()


# //Filter multiple condition
# df.filter( (df.state  == "OH") & (df.gender  == "M") ) \
#     .show(truncate=False)  

In [65]:
df_3 = spark.sql("""
SELECT * FROM FHV 
WHERE CAST(pickup_datetime as date) == '2021-02-15'
""") 

In [66]:
df_3.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-15 00:14:30|2021-02-15 00:26:11|         108|          22|   null|
|           HV0005|              B02510|2021-02-15 00:23:46|2021-02-15 00:41:09|          37|          61|   null|
|           HV0004|              B02800|2021-02-15 00:34:20|2021-02-15 00:39:34|          82|          82|   null|
|           HV0004|              B02800|2021-02-15 00:49:51|2021-02-15 01:10:29|         138|         239|   null|
|           HV0003|              B02884|2021-02-15 00:03:13|2021-02-15 00:19:23|          81|          47|   null|
|           HV0003|              B02884|2021-02-15 00:33:30|2021-02-15 00:46:24|

In [67]:
df_3.count()

367170

In [92]:
day_longest_trip = spark.sql("""
SELECT CAST(pickup_datetime as date) AS day, MAX((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60) AS duration 
FROM FHV
GROUP BY CAST(pickup_datetime as date)

""")

In [94]:
day_longest_trip.show(40)

+----------+------------------+
|       day|          duration|
+----------+------------------+
|2021-02-15|431.23333333333335|
|2021-02-02| 515.2166666666667|
|2021-02-26|407.03333333333336|
|2021-02-21|            537.05|
|2021-02-05|508.51666666666665|
|2021-02-10| 569.4833333333333|
|2021-02-01|343.96666666666664|
|2021-02-06| 524.1166666666667|
|2021-02-19| 626.2833333333333|
|2021-02-20| 733.9833333333333|
|2021-02-08|501.76666666666665|
|2021-02-09| 534.7833333333333|
|2021-02-11|            1259.0|
|2021-02-17| 953.6833333333333|
|2021-02-25|             583.5|
|2021-02-27| 452.8333333333333|
|2021-02-24|394.48333333333335|
|2021-02-18| 576.8666666666667|
|2021-02-14|496.28333333333336|
|2021-02-12|502.46666666666664|
|2021-02-22|             471.3|
|2021-02-03|            677.55|
|2021-02-13| 357.3666666666667|
|2021-02-23|407.31666666666666|
|2021-02-16|424.01666666666665|
|2021-02-07|294.53333333333336|
|2021-02-28| 330.8333333333333|
|2021-02-04|426.53333333333336|
+-------

In [107]:
most_frequent_dispatching_num = spark.sql("""
SELECT * FROM (
SELECT dispatching_base_num, 
    COUNT(*) AS occurences,
    RANK() OVER (ORDER BY COUNT(*) DESC) as rank
FROM FHV 
GROUP BY dispatching_base_num)

WHERE rank = 1


""")

In [108]:
most_frequent_dispatching_num.show(100)

+--------------------+----------+----+
|dispatching_base_num|occurences|rank|
+--------------------+----------+----+
|              B02510|   3233664|   1|
+--------------------+----------+----+



In [109]:
zones_df = spark.read.parquet('zones')

In [111]:
zones_df.registerTempTable('zones')

In [110]:
zones_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [131]:
most_common_locations_pair = spark.sql("""

SELECT DISTINCT(CONCAT(COALESCE(zpu.Zone, "Unknown"), " / ", COALESCE(zdo.Zone, "Unknown"))) AS pickup_and_dropoff,
COUNT(*) as counts
FROM 
    FHV LEFT JOIN zones zpu  ON
        FHV.PULocationID = zpu.LocationID
    LEFT JOIN zones zdo ON
        FHV.DOLocationID = zdo.LocationID
GROUP BY pickup_and_dropoff
ORDER BY counts DESC



        

""")

# Here we do two left joins, the Pick up location id on the left with the zones look up, 
# and Dropoff location id on the left with the look up table we give the lookup table a unique name for zpu and zdo
# we use this in our SELECT(DISTINC(CONCAT)) Query so that we can get the string version of the name from the lookup table
# then group by the pickup_and_dropoff 




# SELECT MAX(avg_total) FROM (
# SELECT 
# 	DISTINCT CONCAT(zpu."Zone", ' / ', zdo."Zone") AS pickup_and_dropoff, 
# 	AVG(total_amount) as avg_total
# FROM 
# 	yellow_taxi_data y JOIN zones zpu 
# 		ON y."PULocationID" = zpu."LocationID"
# 	JOIN zones zdo
# 		ON y."DOLocationID" = zdo."LocationID"
# GROUP BY pickup_and_dropoff
# ) subquery;

In [132]:
most_common_locations_pair.show(40)

+--------------------+------+
|  pickup_and_dropoff|counts|
+--------------------+------+
|East New York / E...| 45041|
|Borough Park / Bo...| 37329|
| Canarsie / Canarsie| 28026|
|Crown Heights Nor...| 25976|
|Bay Ridge / Bay R...| 17934|
|   Astoria / Astoria| 14688|
|Jackson Heights /...| 14688|
|Central Harlem No...| 14481|
|Bushwick South / ...| 14424|
|Flatbush/Ditmas P...| 13976|
|South Ozone Park ...| 13716|
|Brownsville / Bro...| 12829|
|    JFK Airport / NA| 12542|
|Prospect-Lefferts...| 11814|
|Forest Hills / Fo...| 11548|
|Bushwick North / ...| 11491|
|Bushwick South / ...| 11487|
|Crown Heights Nor...| 11462|
|Crown Heights Nor...| 11342|
|Prospect-Lefferts...| 11308|
|Stuyvesant Height...| 11293|
|Brownsville / Eas...| 11244|
|   Bedford / Bedford| 11021|
|Canarsie / East N...| 10688|
|Stuyvesant Height...| 10675|
|East New York / B...| 10621|
| Elmhurst / Elmhurst| 10604|
|Soundview/Castle ...| 10519|
|Central Harlem No...| 10304|
|Central Harlem / ...| 10260|
|Bedford /