In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

### Question 1
Run the command "spark.version" What version number is output?

In [3]:
spark.version

'3.3.2'

### Downlaod Data

```bash
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
```

```bash
gzip -d fhvhv_tripdata_2021-06.csv.gz
```

```bash
wc -l fhvhv_tripdata_2021-06.csv
```

### Question 2
Repartition the June 2021 HVFHV Data into 12 partitions and save it to Parquet. What is the average size of the Parquet Files?

In [4]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [5]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

df = df.repartition(12)

df.write.parquet('data/pq/fhvhv/2021/06/', mode='overwrite')

```bash
ls -lh data/pq/fhvhv/2021/06/
```

### Question 3
How many taxi trips were started on June 15th?

In [6]:
df = spark.read.parquet('data/pq/fhvhv/2021/06/')

In [7]:
from pyspark.sql.functions import *

In [8]:
df \
    .withColumn('pickup_date', to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

452470

#### With SQL

In [9]:
df.createOrReplaceTempView('fhvhv_2021_06')

In [10]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2021_06
WHERE
    to_date(pickup_datetime) = '2021-06-15';
""").show()

+--------+
|count(1)|
+--------+
|  452470|
+--------+



### Question 4
How long is the longest trip in the dataset?

In [11]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [12]:
df \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('durationInHours',round(col('duration')/3600, 2)) \
    .orderBy('duration', ascending=False) \
    .drop('dispatching_base_num','Affiliated_base_number','SR_Flag') \
    .limit(5) \
    .show()

+-------------------+-------------------+------------+------------+--------+---------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|duration|durationInHours|
+-------------------+-------------------+------------+------------+--------+---------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|  240764|          66.88|
|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|   91979|          25.55|
|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|   71931|          19.98|
|2021-06-26 22:37:11|2021-06-27 16:49:01|         263|          36|   65510|           18.2|
|2021-06-23 20:40:43|2021-06-24 13:08:44|           3|         247|   59281|          16.47|
+-------------------+-------------------+------------+------------+--------+---------------+



#### With SQL

In [13]:
spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime,
    PULocationID,
    DOLocationID,
    ROUND((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)), 2) AS duration,
    ROUND((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600, 2) AS durationInHours
FROM 
    fhvhv_2021_06
ORDER BY
    6 DESC
LIMIT 5;
""").show()

+-------------------+-------------------+------------+------------+--------+---------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|duration|durationInHours|
+-------------------+-------------------+------------+------------+--------+---------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|  240764|          66.88|
|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|   91979|          25.55|
|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|   71931|          19.98|
|2021-06-26 22:37:11|2021-06-27 16:49:01|         263|          36|   65510|           18.2|
|2021-06-23 20:40:43|2021-06-24 13:08:44|           3|         247|   59281|          16.47|
+-------------------+-------------------+------------+------------+--------+---------------+



### Question 5
What port does Spark's User Interface Dashboard run on by default?

#### Ans
4040

### Question 6
What is the name of the most frequent pickup location zone?

In [14]:
df_zones = spark.read.parquet('zones/')

In [15]:
df_joined = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [16]:
df_joined.drop('LocationID','dispatching_base_num','Affiliated_base_number','SR_Flag').show()

+-------------------+-------------------+------------+------------+-------------+--------------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|      Borough|                Zone|service_zone|
+-------------------+-------------------+------------+------------+-------------+--------------------+------------+
|2021-06-02 18:08:07|2021-06-02 18:44:45|          82|         169|       Queens|            Elmhurst|   Boro Zone|
|2021-06-01 10:16:48|2021-06-01 11:04:03|         122|          71|       Queens|              Hollis|   Boro Zone|
|2021-06-04 10:54:32|2021-06-04 11:20:41|         121|         226|       Queens|   Hillcrest/Pomonok|   Boro Zone|
|2021-06-04 09:48:41|2021-06-04 10:19:33|         139|          86|       Queens|           Laurelton|   Boro Zone|
|2021-06-02 12:43:19|2021-06-02 13:00:35|          53|          83|       Queens|       College Point|   Boro Zone|
|2021-06-03 13:49:44|2021-06-03 14:17:44|         228|          71|     

In [17]:
df_joined \
    .groupBy('Zone') \
        .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()

+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
|       East Village|221244|
|        JFK Airport|188867|
|     Bushwick South|187929|
|      East New York|186780|
+-------------------+------+



#### With SQL

In [18]:
df_joined.createOrReplaceTempView('joined_fhv')

In [19]:
spark.sql("""
SELECT
    Zone,
    COUNT(1)
FROM 
    joined_fhv
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

+-------------------+--------+
|               Zone|count(1)|
+-------------------+--------+
|Crown Heights North|  231279|
|       East Village|  221244|
|        JFK Airport|  188867|
|     Bushwick South|  187929|
|      East New York|  186780|
+-------------------+--------+



### END