In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [None]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2021-06.csv')

In [None]:
df = df.repartition(12)

In [None]:
df.write.parquet('fhv/2021/06/', mode='overwrite')

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read.parquet('fhv/2021/06')

23/03/13 14:42:14 WARN Utils: Your hostname, liujunweideMacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.18 instead (on interface en0)
23/03/13 14:42:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/13 14:42:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
df.printSchema()

In [None]:
df.show(5)

In [3]:
df.filter(F.dayofmonth(df.pickup_datetime) == '15').count()

                                                                                

452470

In [None]:
df = df.withColumn('duration', F.datediff(df.dropoff_datetime, df.pickup_datetime))

In [4]:
df = df.withColumn('duration', (F.unix_timestamp(df.dropoff_datetime) - F.unix_timestamp(df.pickup_datetime))/3600)

In [6]:
df.sort(df.duration.desc()).show(1)



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|        duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|                B02872|66.8788888888889|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
only showing top 1 row



                                                                                

In [7]:
df_zone = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [8]:
df_zone.show(2)

+----------+-------+--------------+------------+
|LocationID|Borough|          Zone|service_zone|
+----------+-------+--------------+------------+
|         1|    EWR|Newark Airport|         EWR|
|         2| Queens|   Jamaica Bay|   Boro Zone|
+----------+-------+--------------+------------+
only showing top 2 rows



In [18]:
df_join = df.join(df_zone, df.PULocationID == df_zone.LocationID).drop(df_zone.LocationID)
df_join = df_join.withColumnRenamed('Borough', 'pickup_borough') \
    .withColumnRenamed('Zone', 'pickup_zone') \
    .withColumnRenamed('service_zone', 'pickup_service_zone')
df_join = df_join.join(df_zone, df.DOLocationID == df_zone.LocationID).drop(df_zone.LocationID)
df_join = df_join.withColumnRenamed('Borough', 'dropoff_borough') \
    .withColumnRenamed('Zone', 'dropoff_zone') \
    .withColumnRenamed('service_zone', 'dropoff_service_zone')

In [19]:
df_join.show(3)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+--------------+-------------------+-------------------+----------+---------------+--------------------+--------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|           duration|pickup_borough|        pickup_zone|pickup_service_zone|LocationID|dropoff_borough|        dropoff_zone|dropoff_service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+--------------+-------------------+-------------------+----------+---------------+--------------------+--------------------+
|              B02875|2021-06-02 13:50:06|2021-06-02 13:59:21|         223|         129|      N|                B02875|0.15416666666666667|        Queens|           Steinway|          Boro Zone|       129|   

In [20]:
df_join.registerTempTable('fhv_data')



In [24]:
spark.sql("""
    SELECT pickup_zone, COUNT(*) as amonut
    FROM fhv_data
    GROUP BY 1
    ORDER BY 2 DESC
    LIMIT 3
""").show()



+-------------------+------+
|        pickup_zone|amonut|
+-------------------+------+
|Crown Heights North|231279|
|       East Village|221244|
|        JFK Airport|188867|
+-------------------+------+



                                                                                