In [1]:
import pandas as pd
import pyspark
from pyspark.sql import types
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/03 18:25:25 WARN Utils: Your hostname, codespaces-373050 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/03/03 18:25:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/03 18:25:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
!wc -l fhv_tripdata_2019-10.csv

In [3]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

                                                                                

In [None]:
df.printSchema()

In [None]:
df \
    .withColumn('pickup_date',F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date',F.to_date(df.dropOff_datetime)) \
    .show()

In [4]:
df_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.StringType(), True),
    types.StructField("DOLocationID", types.StringType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [5]:
df = spark.read \
        .option("header", "true") \
        .schema(df_schema) \
        .csv('fhv_tripdata_2019-10.csv')

In [6]:
df \
    .repartition(6) \
    .write.parquet('data/fhvoct19', mode='overwrite')

24/03/03 18:25:43 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [7]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [8]:
df.createOrReplaceTempView('fhv_oct19')

In [9]:
df_15oct_count = spark.sql("""
SELECT COUNT(*) as rowcount
FROM fhv_oct19
WHERE TO_DATE(pickup_datetime)="2019-10-15"
""")

In [10]:
df_15oct_count.show()



+--------+
|rowcount|
+--------+
|   62610|
+--------+



                                                                                

In [11]:
df_long_trip = spark.sql("""
SELECT (dropOff_datetime - pickup_datetime)*24 AS length_hrs
FROM fhv_oct19
ORDER BY length_hrs DESC
LIMIT 5
""")

In [12]:
df_long_trip.show()

[Stage 7:>                                                          (0 + 2) / 2]

+--------------------+
|          length_hrs|
+--------------------+
|INTERVAL '631152 ...|
|INTERVAL '631152 ...|
|INTERVAL '87672 1...|
|INTERVAL '70128 0...|
|INTERVAL '8794 00...|
+--------------------+



                                                                                

In [13]:
df_zones = spark.read.parquet('zones/')

In [14]:
df_zones.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [15]:
df_result = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [16]:
df_result.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [17]:
df_result.createOrReplaceTempView('fhv_oct19_result')

In [20]:
df_result_leastPU = spark.sql("""
SELECT Zone, COUNT(*) as rowcount
FROM fhv_oct19_result
GROUP BY Zone
ORDER BY rowcount ASC
LIMIT 5    
""")

In [21]:
df_result_leastPU.show()



+--------------------+--------+
|                Zone|rowcount|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+



                                                                                

In [22]:
spark.stop()