## Spark using Google Colab

In [None]:
# Install Apache Spark 3.5.1
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz


In [None]:
# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
# Install findspark package to locate Spark in Python. it help Python locate Spark.
!pip install -q findspark pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import types

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [None]:
spark

In [None]:
# #download the data
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-03 09:08:27--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T090827Z&X-Amz-Expires=300&X-Amz-Signature=95d06ab8970de4b3b61cabdd2972bcd55d96c2becb52e1c1059ba3f9e261df6e&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-03 09:08:27--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [None]:
# read data
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", True) \
    .csv('/content/fhv_tripdata_2019-10.csv.gz') \
    # .show(5)
(df.show(5))

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
+--------------------+------------------

In [None]:
#check the inferred schema
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [None]:
# #specify  expected schema
# schema = types.StructType(
#     [
#         types.StructField('dispatching_base_num', types.StringType(), True),
#         types.StructField('pickup_datetime', types.TimestampType(), True),
#         types.StructField('dropoff_datetime', types.TimestampType(), True),
#         types.StructField('PULocationID', types.IntegerType(), True),
#         types.StructField('DOLocationID', types.IntegerType(), True),
#         types.StructField('SR_Flag', types.IntegerType(), True),
#         types.StructField('Affiliated_base_number', types.StringType(), True)
#     ]
# )

In [None]:
#Repartition the Dataframe to 6 partitions and save it to parquet.
df = df.repartition(6)
df.write.parquet('fhvhv/2019/10/')

In [None]:
#reading the repartition parquet data
df = spark.read.parquet('fhvhv/2019/10/')

In [None]:
# Register df as a temporary view
df.createOrReplaceTempView("trips")

# query
count= sql_query = """
    SELECT COUNT(*) AS trip_count
    FROM trips
    WHERE date_format(pickup_datetime, 'yyyy-MM-dd') = '2019-10-15'
"""
# Execute the SQL query
trip_count = spark.sql(count)
trip_count.show()

+----------+
|trip_count|
+----------+
|     62610|
+----------+



In [None]:
# What is the length of the longest trip in the dataset in hours?
# Write SQL query to calculate trip duration in hours
sql_query = """
    SELECT MAX((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) / 3600) AS longest_trip_hours
    FROM trips
"""

# Execute SQL query
trip_hours = spark.sql(sql_query)

# Show the result
trip_hours.show(truncate=False)

+------------------+
|longest_trip_hours|
+------------------+
|631152.5          |
+------------------+



In [None]:
# Download zone data
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-03 09:25:36--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T092536Z&X-Amz-Expires=300&X-Amz-Signature=b0016708f439adc154f5d714a22e5e95226dab90cf0e43219159c744e05f8c8b&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-03 09:25:36--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [None]:
zone = spark.read.csv('/content/taxi_zone_lookup.csv',header=True)
zone.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [None]:
# What is the length of the longest trip in the dataset in hours?
# Register zone as a temporary view
zone.createOrReplaceTempView("zone")

# Define the SQL query to join the two tables
sql_query = """
    SELECT z.Zone, Count(*) AS pickup_count
    FROM trips t
    JOIN zone z ON t.PULocationID = z.LocationID
    WHERE z.LocationID IS NOT NULL
    GROUP BY z.Zone
    ORDER BY pickup_count asc
    LIMIT 1
"""

# Execute the SQL query
least_trip = spark.sql(sql_query)

# Show the result
least_trip.show()

+-----------+------------+
|       Zone|pickup_count|
+-----------+------------+
|Jamaica Bay|           1|
+-----------+------------+

