In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/04/16 07:14:35 WARN Utils: Your hostname, codespaces-d61839 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/04/16 07:14:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/04/16 07:14:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
!head -n 1001 fhvhv_tripdata_2021-01.csv > fhvhv_tripdata_2021-01_head.csv

In [4]:
df = spark.read \
    .option("header", "true") \
    .option("InferSchema", "true") \
    .csv('green_tripdata_2021-01.csv')

                                                                                

In [5]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('lpep_pickup_datetime', TimestampType(), True), StructField('lpep_dropoff_datetime', TimestampType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', IntegerType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('passenger_count', IntegerType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', StringType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', IntegerType(), True), StructField('trip_type', IntegerType(), True), StructField('congestion_surcharge',

In [6]:
from pyspark.sql import types

In [7]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True), 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True)
])

In [8]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [9]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True)])

In [19]:
# note this is lazy, i.e. nth is running until df is called again
df = df.repartition(24)

In [20]:
df.write.parquet('fhvhv/2021/01/')

                                                                                

In [10]:
df = spark.read.parquet('fhvhv/2021/01/')

In [11]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [12]:
from pyspark.sql import functions as F

In [14]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-03|  2021-01-03|         255|          34|
| 2021-01-05|  2021-01-05|         189|         107|
| 2021-01-02|  2021-01-02|          88|         137|
| 2021-01-02|  2021-01-03|         238|         224|
| 2021-01-06|  2021-01-06|         169|         208|
| 2021-01-07|  2021-01-07|          75|          88|
| 2021-01-07|  2021-01-07|         210|         210|
| 2021-01-02|  2021-01-02|         243|          69|
| 2021-01-04|  2021-01-04|         250|         213|
| 2021-01-03|  2021-01-03|          87|          79|
| 2021-01-03|  2021-01-03|          68|         181|
| 2021-01-04|  2021-01-04|          95|         236|
| 2021-01-02|  2021-01-02|         262|         236|
| 2021-01-04|  2021-01-04|         225|         233|
| 2021-01-06|  2021-01-06|         237|          83|
| 2021-01-05|  2021-01-05|         231|       

                                                                                

In [16]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003') \
  .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-05 22:14:07|2021-01-05 22:32:28|         189|         107|
|2021-01-02 17:59:55|2021-01-02 18:10:39|          88|         137|
|2021-01-02 23:57:54|2021-01-03 00:15:48|         238|         224|
|2021-01-06 15:53:13|2021-01-06 16:07:07|         169|         208|
|2021-01-07 07:35:24|2021-01-07 07:55:49|          75|          88|
|2021-01-07 08:45:12|2021-01-07 08:51:17|         210|         210|
|2021-01-02 15:44:26|2021-01-02 16:10:50|         243|          69|
|2021-01-04 16:50:28|2021-01-04 16:57:43|         250|         213|
|2021-01-03 10:30:34|2021-01-03 10:44:53|          87|          79|
|2021-01-03 22:05:20|2021-01-03 22:27:55|          68|         181|
|2021-01-04 08:01:02|2021-01-04 08:33:27|          95|         236|
|2021-01-02 13:01:10|2021-01-02 13:08:11|       