In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/05 19:47:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Q1

In [9]:
pyspark.__version__

'3.3.2'

## Q2

In [None]:
#!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

In [2]:
from pyspark.sql.types import *

In [3]:
schema_fhvhv_2021_06 = StructType([
    StructField("dispatching_base_num", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("SR_Flag", IntegerType(), True),
    StructField("Affiliated_base_number", IntegerType(), True),])

In [4]:
 df = spark.read \
    .option("header", "true") \
    .schema(schema_fhvhv_2021_06) \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [5]:
type(df)

pyspark.sql.dataframe.DataFrame

In [6]:
df.count()

                                                                                

14961892

In [None]:
#print(df.rdd.getNumPartitions())

In [7]:
df = df.repartition(12)

In [8]:
df.write.parquet('fhvhv/2021/06/')

                                                                                

In [11]:
!ls -lh fhvhv/2021/06/ | awk '{print $5, $9}' | grep -v SUCCESS

 
23M part-00000-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00001-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00002-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00003-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00004-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00005-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00006-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00007-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00008-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00009-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00010-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet
23M part-00011-5db8d822-80b8-47f0-abb1-a2f9a4fe2263-c000.snappy.parquet


## Q3

In [12]:
df_parquet = spark.read.parquet('fhvhv/2021/06/')

In [13]:
df_parquet

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: int, Affiliated_base_number: int]

In [14]:
df_parquet.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: integer (nullable = true)



In [18]:
# Query example
df_parquet.select('dispatching_base_num', 'pickup_datetime') \
    .filter(df_parquet.dispatching_base_num == 'B02764') \
    .show()

+--------------------+-------------------+
|dispatching_base_num|    pickup_datetime|
+--------------------+-------------------+
|              B02764|2021-06-27 15:18:58|
|              B02764|2021-06-04 05:32:41|
|              B02764|2021-06-24 11:44:31|
|              B02764|2021-06-18 07:50:23|
|              B02764|2021-06-25 04:27:02|
|              B02764|2021-06-03 05:46:56|
|              B02764|2021-06-10 19:54:38|
|              B02764|2021-06-12 22:13:11|
|              B02764|2021-06-10 21:50:35|
|              B02764|2021-06-24 17:30:39|
|              B02764|2021-06-09 05:54:15|
|              B02764|2021-06-03 22:21:22|
|              B02764|2021-06-24 21:17:24|
|              B02764|2021-06-22 01:31:32|
|              B02764|2021-06-06 07:36:30|
|              B02764|2021-06-20 16:23:45|
|              B02764|2021-06-07 06:33:34|
|              B02764|2021-06-02 19:41:53|
|              B02764|2021-06-30 23:38:37|
|              B02764|2021-06-07 17:58:26|
+----------

In [19]:
from pyspark.sql import functions as F

In [34]:
# Add a new column pickup_date (only date, not hour)
df_parquet \
    .withColumn('pickup_date', F.to_date(df_parquet.pickup_datetime)) \
    .select('pickup_date') \
    .filter(F.col('pickup_date') == '2021-06-15') \
    .count()

                                                                                

452470

## Q4

In [43]:
biggest_trip_in_seconds = df_parquet \
    .withColumn('trip_duration_in_seconds', \
        F.unix_timestamp(df_parquet.dropoff_datetime) - F.unix_timestamp(df_parquet.pickup_datetime)) \
    .select('trip_duration_in_seconds') \
    .orderBy(F.desc('trip_duration_in_seconds')).first()


                                                                                

In [44]:
biggest_trip_in_seconds

Row(trip_duration_in_seconds=240764)

In [45]:
biggest_trip_in_hours = int(biggest_trip_in_seconds['trip_duration_in_seconds'])/60/60

In [46]:
print(int(biggest_trip_in_seconds['trip_duration_in_seconds'])/60/60)

66.87888888888888


## Q5

```bash
export SPARK_PID=$(ps -fea | grep java | grep spark |  awk '{print $2}')
```

```bash
sudo lsof -p ${SPARK_PID} | grep LISTEN | grep "*"
```

```
java    2057   jp  289u     IPv6              32057       0t0     TCP *:4040 (LISTEN)
```

## Q6

In [59]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-03-05 22:15:14--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230305T221514Z&X-Amz-Expires=300&X-Amz-Signature=16528b4ba1a6e6328bd30efb0b4e11e67459e891b39c672afe4d23922115f1e3&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-05 22:15:14--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [61]:
schema_taxi_zone_lookup = StructType([
    StructField("LocationID", IntegerType(), True),
    StructField("Borough", StringType(), True),
    StructField("Zone", StringType(), True),
    StructField("service_zone", StringType(), True)])

In [62]:
 df_taxi_zl = spark.read \
    .option("header", "true") \
    .schema(schema_taxi_zone_lookup) \
    .csv('taxi_zone_lookup.csv')

In [64]:
df_taxi_zl.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [52]:
df_parquet.registerTempTable('temp_table_fhvhv_tripdata')



In [57]:
df_parquet.createOrReplaceTempView('temp_table_fhvhv_tripdata')

In [65]:
df_taxi_zl.registerTempTable('temp_table_taxi_zone_lookup')

In [71]:
df_parquet.count()

14961892

In [69]:
spark.sql("""
select count(*)
from temp_table_fhvhv_tripdata
""").collect()[0][0]

14961892

In [72]:
df_taxi_zl.count()

265

In [70]:
spark.sql("""
select count(*)
from temp_table_taxi_zone_lookup
""").collect()[0][0]

265

In [84]:
spark.sql("""
select t.PULocationID, z.Zone, count(*) as count
from temp_table_fhvhv_tripdata t, temp_table_taxi_zone_lookup z
where t.PULocationID = z.LocationID
group by t.PULocationID, z.Zone
order by count desc
limit 1
""").collect()[0]["Zone"]

                                                                                

'Crown Heights North'