In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/28 10:53:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Taxi zones

In [3]:
df = (
    spark.
    read.
    option("header","true").
    csv("data/taxi_zone_lookup.csv")    
)

In [4]:
df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [11]:
df.write.parquet("data/taxi_zone")

[Stage 3:>                                                          (0 + 1) / 1]                                                                                

# Question 1

In [13]:
spark.version

'3.3.2'

# Question 2

**HVFHW June 2021**

Read it with Spark using the same schema as we did in the lessons.</br> 
We will use this dataset for all the remaining questions.</br>
Repartition it to 12 partitions and save it to parquet.</br>
What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.</br>


- 2MB
- [x] 24MB
- 100MB
- 250MB
</br></br>

In [5]:
hsv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PUlocationID", types.IntegerType(), True),
    types.StructField("DOlocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.FloatType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [6]:
input_path = "data/fhvhv_tripdata_2021-06.csv.gz"

In [7]:
df_hsv = spark.read \
    .option("header", "true") \
    .schema(hsv_schema) \
    .csv(input_path)

In [8]:
df_hsv.show(2, vertical=True)

-RECORD 0-------------------------------------
 dispatching_base_num   | B02764              
 pickup_datetime        | 2021-06-01 00:02:41 
 dropOff_datetime       | 2021-06-01 00:07:46 
 PUlocationID           | 174                 
 DOlocationID           | 18                  
 SR_Flag                | null                
 Affiliated_base_number | B02764              
-RECORD 1-------------------------------------
 dispatching_base_num   | B02764              
 pickup_datetime        | 2021-06-01 00:16:16 
 dropOff_datetime       | 2021-06-01 00:21:14 
 PUlocationID           | 32                  
 DOlocationID           | 254                 
 SR_Flag                | null                
 Affiliated_base_number | B02764              
only showing top 2 rows



In [18]:
df_hsv.repartition(12).write.parquet("data/hsv_2")

[Stage 7:>                                                        (0 + 12) / 12]

23/02/27 14:29:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/02/27 14:29:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/02/27 14:29:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/02/27 14:29:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/02/27 14:29:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/02/27 14:29:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/02/27 14:29:43 WARN MemoryManager: Total allocation exceeds 95.



Size of each partition equals 24 Mb

### Question 3: 

**Count records**  

How many taxi trips were there on June 15?</br></br>
Consider only trips that started on June 15.</br>

- 308,164
- 12,856
- [x] 452,470
- 50,982
</br></br>

In [22]:
df_hsv.filter("date_trunc('day', pickup_datetime) = '2021-06-15'").count()

                                                                                

452470


### Question 4: 

**Longest trip for each day**  

Now calculate the duration for each trip.</br>
How long was the longest trip in Hours?</br>

- [x] 66.87 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours
</br></br>

In [27]:
df_hsv\
    .withColumn('ride_in_hours', (F.unix_timestamp(df_hsv['dropOff_datetime']) - F.unix_timestamp(df_hsv['pickup_datetime']))/3600)\
    .orderBy(F.col('ride_in_hours').desc())\
    .head()

    

                                                                                

Row(dispatching_base_num='B02872', pickup_datetime=datetime.datetime(2021, 6, 25, 13, 55, 41), dropOff_datetime=datetime.datetime(2021, 6, 28, 8, 48, 25), PUlocationID=98, DOlocationID=265, SR_Flag=None, Affiliated_base_number='B02872', ride_in_hours=66.8788888888889)

### Question 5: 

**User Interface**

 Spark’s User Interface which shows application's dashboard runs on which local port?</br>

- 80
- 443
- [x] 4040
- 8080
</br></br>

### Question 6: 

**Most frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)</br>

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?</br>

- East Chelsea
- Astoria
- Union Sq
- [x] Crown Heights North
</br></br>

In [14]:
df_hsv_pu_locations = df_hsv.groupby('PUlocationID').count()

In [18]:
df_hsv_pu_locations.join(df, df_hsv_pu_locations['PUlocationID'] == df['LocationID'], "inner").orderBy(F.col("count").desc()).show()

[Stage 13:>                                                         (0 + 1) / 1]

+------------+------+----------+---------+--------------------+------------+
|PUlocationID| count|LocationID|  Borough|                Zone|service_zone|
+------------+------+----------+---------+--------------------+------------+
|          61|231279|        61| Brooklyn| Crown Heights North|   Boro Zone|
|          79|221244|        79|Manhattan|        East Village| Yellow Zone|
|         132|188867|       132|   Queens|         JFK Airport|    Airports|
|          37|187929|        37| Brooklyn|      Bushwick South|   Boro Zone|
|          76|186780|        76| Brooklyn|       East New York|   Boro Zone|
|         231|164344|       231|Manhattan|TriBeCa/Civic Center| Yellow Zone|
|         138|161596|       138|   Queens|   LaGuardia Airport|    Airports|
|         234|158937|       234|Manhattan|            Union Sq| Yellow Zone|
|         249|154698|       249|Manhattan|        West Village| Yellow Zone|
|           7|152493|         7|   Queens|             Astoria|   Boro Zone|

                                                                                