# Week 5 of the De Zoomcamp - Working with Spark

In [1]:
# Installing spark
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=ac907f90fe1a717fc0aa1c9dbc9dfcf3b93e956df33db5149eada08685fa08be
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [10]:
from pyspark.sql import types

In [5]:
# to test spark
# !wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-07 01:22:41--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240307T012241Z&X-Amz-Expires=300&X-Amz-Signature=53e6d5a6f29c5c85a96b9cbc46acc630646e22115fb7872556a8d0f0e614788d&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-07 01:22:41--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [43]:
import pandas as pd

df_pandas = pd.read_csv('/content/fhv_tripdata_2019-10.csv.gz', nrows=1000)
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [44]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', DoubleType(), True), StructField('DOlocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [45]:
schema = types.StructType([types.StructField('dispatching_base_num', types.StringType(), True),
types.StructField('pickup_datetime', types.TimestampType(), True),
types.StructField('dropOff_datetime', types.TimestampType(), True),
types.StructField('PUlocationID', types.DoubleType(), True),
types.StructField('DOlocationID', types.DoubleType(), True),
types.StructField('SR_Flag', types.DoubleType(), True),
types.StructField('Affiliated_base_number', types.StringType(), True)])

In [46]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('/content/fhv_tripdata_2019-10.csv.gz')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|       264.0|       264.0|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|       264.0|       264.0|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|       264.0|       264.0|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|       264.0|       264.0|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|       264.0|       264.0|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [47]:
df = df.repartition(6)

In [4]:
spark.version

'3.5.1'

In [48]:
df.write.parquet('fhv_tripdata/2019/10/')

In [54]:
df.registerTempTable('fhv_trip_data')



In [53]:
!ls -lh fhv_tripdata/2019/10/

total 39M
-rw-r--r-- 1 root root 6.4M Mar  7 01:47 part-00000-2d9499d7-a12f-4845-9e32-fe9e67206af2-c000.snappy.parquet
-rw-r--r-- 1 root root 6.4M Mar  7 01:47 part-00001-2d9499d7-a12f-4845-9e32-fe9e67206af2-c000.snappy.parquet
-rw-r--r-- 1 root root 6.4M Mar  7 01:47 part-00002-2d9499d7-a12f-4845-9e32-fe9e67206af2-c000.snappy.parquet
-rw-r--r-- 1 root root 6.4M Mar  7 01:47 part-00003-2d9499d7-a12f-4845-9e32-fe9e67206af2-c000.snappy.parquet
-rw-r--r-- 1 root root 6.4M Mar  7 01:47 part-00004-2d9499d7-a12f-4845-9e32-fe9e67206af2-c000.snappy.parquet
-rw-r--r-- 1 root root 6.4M Mar  7 01:47 part-00005-2d9499d7-a12f-4845-9e32-fe9e67206af2-c000.snappy.parquet
-rw-r--r-- 1 root root    0 Mar  7 01:47 _SUCCESS


In [62]:
df.count()

1897493

In [63]:
spark.sql(
    '''
    SELECT
      COUNT(1)
    FROM
      fhv_trip_data
    WHERE
      DATE(pickup_datetime) = '2019-10-15'
    '''
).show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



In [70]:
spark.sql(
    '''
    SELECT
        MAX((UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / 3600) AS longest_trip_hours
    FROM
        fhv_trip_data
    '''
).show()


+------------------+
|longest_trip_hours|
+------------------+
|          631152.5|
+------------------+



In [72]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-07 02:24:38--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240307T022438Z&X-Amz-Expires=300&X-Amz-Signature=57bdb0e186aac4443441f09f710c580e90e63bc691ebb7fa6089415c9ccb5703&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-07 02:24:38--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [78]:
df_zone = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('/content/taxi_zone_lookup.csv')


In [79]:
df_zone.registerTempTable('zone_lookup')



In [80]:
df_zone.head(5)

[Row(LocationID=1, Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID=2, Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID=3, Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID=4, Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID=5, Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [93]:
spark.sql(
    '''
    SELECT
          z.LocationID,
          z.Borough,
          z.Zone,
          COUNT(*) AS TripCount
      FROM
          fhv_trip_data f
      INNER JOIN
          zone_lookup z
      ON
          f.PULocationID = z.LocationID
      GROUP BY
          z.LocationID,
          z.Borough,
          z.Zone
      ORDER BY
          TripCount ASC
      LIMIT 1
      '''
).show()


+----------+-------+-----------+---------+
|LocationID|Borough|       Zone|TripCount|
+----------+-------+-----------+---------+
|         2| Queens|Jamaica Bay|        1|
+----------+-------+-----------+---------+

