In [None]:
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql import types
from pyspark.sql.functions import col, to_date
import pyspark.sql.functions as F

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Question 1

In [None]:
spark.version

## Question 2

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [None]:
!gzip -d fhv_tripdata_2019-10.csv.gz

In [None]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('./fhv_tripdata_2019-10.csv')

In [None]:
df = df.repartition(6)
df.write.parquet('fhvhv/2019/10/', mode='overwrite')

In [None]:
!ls fhvhv/2019/10/ -lh

## Question 3

In [None]:
df = spark.read.parquet('fhvhv/2019/10/')
df = df.withColumn('pickup_date', to_date(col('pickup_datetime')))

In [None]:
df.filter(df.pickup_date == '2019-10-15').count()

## Question 4

In [None]:
df = spark.read.parquet('fhvhv/2019/10/')
df = df.withColumn('trip_duration', (F.col("dropoff_datetime").cast("long") - F.col("pickup_datetime").cast("long"))/3600)

In [None]:
df.registerTempTable('trips_data')

In [None]:
df_result = spark.sql("SELECT trip_duration FROM trips_data ORDER BY trip_duration DESC LIMIT 1;")
df_result.show()

## Question 5

Spark’s User Interface which shows the application's dashboard runs on which local port?
Answer: 4040

## Question 6

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [None]:
df_zones = spark.read \
            .option("header", "true") \
            .csv('./taxi_zone_lookup.csv')

In [None]:
df_joined = df.join(df_zones, df.PULocationID == df_zones.LocationID, how='left') \
            .select(['pickup_datetime', 'PULocationID', 'Zone'])

In [None]:
df_joined.registerTempTable('trips_data_joined')

In [None]:
df_result = spark.sql("""
    SELECT Zone, COUNT(Zone)
    FROM trips_data_joined 
    GROUP BY Zone
    ORDER BY COUNT(Zone)
    LIMIT 10;
""")
   
df_result.show()