## Import Libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark import SparkContext

## Initiate Spark Session

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/04 11:00:47 WARN Utils: Your hostname, LAPTOP-MLKFF9UL resolves to a loopback address: 127.0.1.1; using 172.24.136.186 instead (on interface eth0)
24/03/04 11:00:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/04 11:00:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/04 11:00:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Download file HVFHW June 2021

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-03-06 19:00:08--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T190009Z&X-Amz-Expires=300&X-Amz-Signature=a8848608b216f434b5494d92d908f716fd725af94bb0f1f0a55de4985eb1dc87&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-06 19:00:09--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

## Unzip File

In [8]:
!gunzip fhvhv_tripdata_2021-06.csv.gz

## Schema

In [4]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [9]:
!ls -lh fhvhv_tripdata_2021-06.csv

-rw-rw-r-- 1 georgios georgios 878M Dec 20 00:13 fhvhv_tripdata_2021-06.csv


## Create Dataframe with the schema above

In [10]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

## Partition to 12 partitions and save it to parquet

In [11]:
df = df.repartition(12)
df.write.parquet('data/pq/fhvhv/2021/06/')

                                                                                

## Initialize a SparkContext and read binary files from a specified directory using binaryfiles method. The count method is then called om the resulting RDSD to get the number of files in the directory

In [12]:
sc = SparkContext.getOrCreate()
parquet_dir = "data/pq/fhvhv/2021/06/"
rdd = sc.binaryFiles(parquet_dir)
num_files = rdd.count()

                                                                                

In [13]:
num_files

12

## Question 2: 

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [14]:
avg_size = rdd.map(lambda x: len(x[1])).reduce(lambda x, y: x + y) / (num_files * 1024 * 1024)
print("The average size is equal to {} MB".format(int(avg_size)))



The average size is equal to 22 MB


                                                                                

## Question 3:

Count records

How many taxi trips were there on June 15?

Consider only trips that started on June 15.

### Import Functions from pyspark.sql

In [15]:
from pyspark.sql import functions as F

In [16]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

                                                                                

452470

## Question 4:

Longest trip for each day. Now calculate the duration for each trip.
How long was the longest trip in Hours?

### Import Functions from pyspark.sql

In [17]:
from pyspark.sql.functions import col, max, round, to_date

In [18]:
df \
    .withColumn('duration', ((col('dropoff_datetime').cast('long') - col('pickup_datetime').cast('long')) / 60)/60) \
    .withColumn('pickup_date', to_date(col('pickup_datetime'))) \
    .groupBy('pickup_date') \
        .max('duration') \
    .withColumn('max_duration_rounded', round(col('max(duration)'), 2)) \
    .orderBy('max_duration_rounded', ascending=False) \
    .limit(5) \
    .show()



+-----------+------------------+--------------------+
|pickup_date|     max(duration)|max_duration_rounded|
+-----------+------------------+--------------------+
| 2021-06-25| 66.87888888888888|               66.88|
| 2021-06-22|25.549722222222222|               25.55|
| 2021-06-27|19.980833333333333|               19.98|
| 2021-06-26| 18.19722222222222|                18.2|
| 2021-06-23|16.466944444444444|               16.47|
+-----------+------------------+--------------------+



                                                                                

## Question 6

Most frequent pickup location zone. Load the zone lookup data into a temp view in Spark
Zone Data. Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

In [20]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-03-06 19:28:50--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T192850Z&X-Amz-Expires=300&X-Amz-Signature=eb5bba3549f145591e6fd3ce4fc4dd2376ed23b4dc663c0c7c1bcdb7c79a1a05&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-06 19:28:50--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [25]:
df_zones= spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')
df_zones.write.parquet('zones',mode='overwrite')

In [26]:
df_zones = spark.read.parquet('zones')

In [27]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [30]:
df.registerTempTable('fhvhv_2021_06')

In [28]:
df_zones.registerTempTable('zones')



In [32]:
spark.sql("""
SELECT
    CONCAT(pul.Zone) AS pu_loc,
    COUNT(1)
FROM 
    fhvhv_2021_06 fhv INNER JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      
GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 1;
""").show()



+-------------------+--------+
|             pu_loc|count(1)|
+-------------------+--------+
|Crown Heights North|  231279|
+-------------------+--------+



                                                                                