In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('week5-hw') \
        .getOrCreate()

23/03/08 16:33:48 WARN Utils: Your hostname, Laptop resolves to a loopback address: 127.0.1.1; using 172.17.161.50 instead (on interface eth0)
23/03/08 16:33:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/08 16:33:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Q1

In [3]:
spark.version

'3.3.1'

In [4]:
df = spark.read \
        .option('header', True) \
        .csv('fhvhv_tripdata_2021-06.csv.gz')

In [5]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [6]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [7]:
fhvhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [8]:
df = spark.read \
        .option('header', True) \
        .schema(fhvhv_schema) \
        .csv('fhvhv_tripdata_2021-06.csv.gz')

In [9]:
df

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string, Affiliated_base_number: string]

In [10]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



### Q2

In [11]:
df = df.repartition(12)

In [12]:
df.write.parquet('fhvhv/2021/06', mode='overwrite')

[Stage 4:>                                                         (0 + 8) / 12]

23/03/08 16:35:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

### Q3

In [13]:
df.createOrReplaceTempView('df_table')

In [14]:
spark.sql("""
    select count(*)
    from df_table
    where pickup_datetime between '2021-06-15 00:00:00' and '2021-06-15 23:59:59'
""").show()

[Stage 5:>                                                          (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

### Q4

In [15]:
spark.sql("""
    select max(dropoff_datetime - pickup_datetime)
    from df_table
""").show()



+-----------------------------------------+
|max((dropoff_datetime - pickup_datetime))|
+-----------------------------------------+
|                     INTERVAL '2 18:52...|
+-----------------------------------------+



                                                                                

### Q5

In [19]:
df_zones = spark.read \
            .option('header', True) \
            .option('inferschema', True) \
            .csv('taxi+_zone_lookup.csv')

In [24]:
df_pu_zone = df.join(df_zones, df.PULocationID==df_zones.LocationID, how='inner')

In [27]:
df_pu_zone.createOrReplaceTempView('df_pu_zone')

In [30]:
spark.sql("""
    select Zone, count(*) as count
    from df_pu_zone
    group by Zone
    order by 2 desc
    limit 10;
""").show()



+--------------------+------+
|                Zone| count|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
+--------------------+------+



                                                                                