In [1]:
import pyspark
from pyspark.sql import SparkSession

# Stop existing session if running
try:
    spark.stop()
except:
    pass

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('TripsDataAnalysis') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 03:29:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/07 03:29:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?

#### Answer -> 3.5.5

In [2]:
spark.version

'3.5.5'

In [3]:
df_fhv = spark.read \
                .option('header', 'true') \
                .csv('/home/yassersakr88/de-zoomcamp/5_batch/notebooks/fhv_tripdata_2019-10.csv.gz')
df_fhv.show()

                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [4]:
df_fhv = df_fhv.repartition(6)

In [5]:
!ls -lhR ./fhv_partitions

./fhv_partitions:
total 39M
-rw-r--r-- 1 yassersakr88 yassersakr88 6.5M Mar  6 23:09 part-00000-3443a95c-b15e-4763-937b-3693002d1fe7-c000.snappy.parquet
-rw-r--r-- 1 yassersakr88 yassersakr88 6.5M Mar  6 23:09 part-00001-3443a95c-b15e-4763-937b-3693002d1fe7-c000.snappy.parquet
-rw-r--r-- 1 yassersakr88 yassersakr88 6.5M Mar  6 23:09 part-00002-3443a95c-b15e-4763-937b-3693002d1fe7-c000.snappy.parquet
-rw-r--r-- 1 yassersakr88 yassersakr88 6.5M Mar  6 23:09 part-00003-3443a95c-b15e-4763-937b-3693002d1fe7-c000.snappy.parquet
-rw-r--r-- 1 yassersakr88 yassersakr88 6.5M Mar  6 23:09 part-00004-3443a95c-b15e-4763-937b-3693002d1fe7-c000.snappy.parquet
-rw-r--r-- 1 yassersakr88 yassersakr88 6.5M Mar  6 23:09 part-00005-3443a95c-b15e-4763-937b-3693002d1fe7-c000.snappy.parquet
-rw-r--r-- 1 yassersakr88 yassersakr88    0 Mar  6 23:09 _SUCCESS


### Question 2: 

**FHV October 2019**

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

- 1MB
- 6MB
- 25MB
- 87MB

#### Answer -> 6 MB

In [6]:
df_fhv.write.parquet('fhv/2019/10/', mode='overwrite')

                                                                                

In [7]:
df_fhv.createOrReplaceTempView('trips_data_oct_2019')

In [8]:
spark.sql("""
SELECT
    count(1)
FROM
    trips_data_oct_2019
WHERE
    pickup_datetime LIKE '2019-10-15%'
""").show()

[Stage 5:>                                                          (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

### Question 3: 

**Count records** 

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

- 108,164
- 12,856
- 452,470
- 62,610

#### Answer -> 62,610

In [9]:
df_fhv.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [None]:
spark.sql("""
SELECT
    (UNIX_TIMESTAMP(dropOff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / 3600 AS trip_length_hrs 
FROM
    trips_data_oct_2019
ORDER BY
    trip_length_hrs DESC
LIMIT 1
""").show()

[Stage 11:>                                                         (0 + 1) / 1]

### Question 4: 

**Longest trip for each day** 

What is the length of the longest trip in the dataset in hours?

- 631,152.50 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours

#### Answer -> 631,152.50 Hours

### Question 5: 

**User Interface**

Spark’s User Interface which shows the application's dashboard runs on which local port?

- 80
- 443
- 4040
- 8080

#### Answer -> 4040

In [None]:
df_zones = spark.read \
                .option('header', 'true') \
                .csv('/home/yassersakr88/de-zoomcamp/5_batch/notebooks/taxi_zone_lookup.csv')
df_zones.show()

In [None]:
df_zones.createOrReplaceTempView('trips_zones')

In [None]:
df_fhv.show()

In [None]:
df_fhv.columns

In [None]:
df_fhv_tmp = df_fhv.drop('dispatching_base_num', 'pickup_datetime', 'dropOff_datetime','DOlocationID', 'SR_Flag', 'Affiliated_base_number')

In [None]:
df_location = df_fhv_tmp.join(df_zones, df_fhv_tmp.PUlocationID == df_zones.LocationID)
df_location.show()

In [None]:
df_location.createOrReplaceTempView('trips_location')

In [None]:
spark.sql("""
SELECT
    Zone,
    COUNT(1)
FROM
    trips_location
GROUP BY 
    1
ORDER BY
    2 ASC
"""
).show()

### Question 6: 

**Least frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?</br>

- East Chelsea
- Jamaica Bay
- Union Sq
- Crown Heights North

#### Answer -> Jamaica Bay