In [1]:
import pyspark
from pyspark.sql import SparkSession

# 1.

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
print("1. Spark version: ", spark.version)

1. Spark version:  3.3.2


# 2.

In [7]:
from pyspark.sql import types

schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.BooleanType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True)

    ]
)

In [18]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [10]:
df = df.repartition(12)

In [12]:
df.write.parquet('fhvhv/2021/06/')

In [13]:
!ls -lh fhvhv/2021/06

total 566784
-rw-r--r--  1 leonidsamorcev  staff     0B Mar  3 13:31 _SUCCESS
-rw-r--r--  1 leonidsamorcev  staff    23M Mar  3 13:31 part-00000-0ef1e5fb-409a-4a70-abc6-116b261865e2-c000.snappy.parquet
-rw-r--r--  1 leonidsamorcev  staff    23M Mar  3 13:31 part-00001-0ef1e5fb-409a-4a70-abc6-116b261865e2-c000.snappy.parquet
-rw-r--r--  1 leonidsamorcev  staff    23M Mar  3 13:31 part-00002-0ef1e5fb-409a-4a70-abc6-116b261865e2-c000.snappy.parquet
-rw-r--r--  1 leonidsamorcev  staff    23M Mar  3 13:31 part-00003-0ef1e5fb-409a-4a70-abc6-116b261865e2-c000.snappy.parquet
-rw-r--r--  1 leonidsamorcev  staff    23M Mar  3 13:31 part-00004-0ef1e5fb-409a-4a70-abc6-116b261865e2-c000.snappy.parquet
-rw-r--r--  1 leonidsamorcev  staff    23M Mar  3 13:31 part-00005-0ef1e5fb-409a-4a70-abc6-116b261865e2-c000.snappy.parquet
-rw-r--r--  1 leonidsamorcev  staff    23M Mar  3 13:31 part-00006-0ef1e5fb-409a-4a70-abc6-116b261865e2-c000.snappy.parquet
-rw-r--r--  1 leonidsamorcev  staff    23M Ma

# 3.

In [56]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [55]:
from pyspark.sql.functions import col, lit, to_date

df.filter(to_date(df.pickup_datetime) == lit("2021-06-15")).count()

# 4.

In [29]:
from pyspark.sql.functions import max as max_, hour

df = df.withColumn('trip_time_hours',  (df['dropoff_datetime'] - df['pickup_datetime']) )

In [34]:
max_trip = df.select(max_(df.trip_time_hours)).collect()

In [35]:
max_trip

[Row(max(trip_time_hours)=datetime.timedelta(days=2, seconds=67964))]

# 6.

In [19]:
schema_taxi_zones = types.StructType(
    [
        types.StructField('LocationID', types.IntegerType(), True),
        types.StructField('Borough', types.StringType(), True),
        types.StructField('Zone', types.StringType(), True),
        types.StructField('service_zone', types.StringType(), True)
    ]
)

In [20]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(schema_taxi_zones) \
    .csv('taxi_zone_lookup.csv')

In [42]:
max_rides_count = df.select('PULocationID').groupby('PULocationID').count()

In [65]:
max_rides_count.agg({"count": "max"}).collect()

[Row(max(count)=231279)]

In [67]:
max_rides_count.filter(col("count") == 231279).show()

+------------+------+
|PULocationID| count|
+------------+------+
|          61|231279|
+------------+------+



In [68]:
df_zones.filter(df_zones.LocationID == 61).show()

+----------+--------+-------------------+------------+
|LocationID| Borough|               Zone|service_zone|
+----------+--------+-------------------+------------+
|        61|Brooklyn|Crown Heights North|   Boro Zone|
+----------+--------+-------------------+------------+

