In [4]:
import pyspark
from pyspark.sql import SparkSession, types

spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()

In [5]:
spark.version

'3.2.1'

In [6]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [7]:
df = spark.read
.option("header", "true")
.schema(schema)
.csv('fhvhv_tripdata_2021-02.csv')

In [8]:
df = df.repartition(24)

In [9]:
df.write.parquet('fhvhv/')

22/03/03 19:59:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/03/03 19:59:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
22/03/03 19:59:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
22/03/03 19:59:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
22/03/03 19:59:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
22/03/03 19:59:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
22/03/03 19:59:22 WARN MemoryManager: Total allocation exceeds 95.

In [13]:
from pyspark.sql import functions as F

In [14]:
df.filter(F.to_date(df.pickup_datetime) == "2021-02-15").count()

                                                                                

367170

In [15]:
df.registerTempTable('trips_data')



In [17]:
spark.sql("""
SELECT pickup_datetime FROM trips_data
WHERE (dropoff_datetime-pickup_datetime) = (SELECT MAX(dropoff_datetime-pickup_datetime) FROM trips_data)
""").show()

[Stage 27:===>                                                    (1 + 15) / 16]

+-------------------+
|    pickup_datetime|
+-------------------+
|2021-02-11 13:40:44|
+-------------------+



                                                                                

In [20]:
df.groupBy(df.dispatching_base_num).count().sort(F.desc("count")).show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



                                                                                

In [22]:
df_zones = spark.read.parquet('zones/')

In [23]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [30]:
df_zones_pu = df_zones.alias("df_zones_pu")
df_zones_do = df_zones.alias("df_zones_do")

In [35]:
(df
 .join(df_zones_pu, F.col('PULocationID') == F.col('df_zones_pu.LocationID'))
 .join(df_zones_do, F.col('DOLocationID') == F.col('df_zones_do.LocationID'))
 .groupBy(F.col("df_zones_pu.Zone"), F.col("df_zones_do.Zone")).count().sort(F.desc("count")).show())



+--------------------+--------------------+-----+
|                Zone|                Zone|count|
+--------------------+--------------------+-----+
|       East New York|       East New York|45041|
|        Borough Park|        Borough Park|37329|
|            Canarsie|            Canarsie|28026|
| Crown Heights North| Crown Heights North|25976|
|           Bay Ridge|           Bay Ridge|17934|
|     Jackson Heights|     Jackson Heights|14688|
|             Astoria|             Astoria|14688|
|Central Harlem North|Central Harlem North|14481|
|      Bushwick South|      Bushwick South|14424|
|Flatbush/Ditmas Park|Flatbush/Ditmas Park|13976|
|    South Ozone Park|    South Ozone Park|13716|
|         Brownsville|         Brownsville|12829|
|         JFK Airport|                  NA|12542|
|Prospect-Lefferts...| Crown Heights North|11814|
|        Forest Hills|        Forest Hills|11548|
|      Bushwick North|      Bushwick South|11491|
|      Bushwick South|      Bushwick North|11487|


                                                                                