# Práctica 1: Procesamiento de datos mediante Apache Spark
Por Luis Daniel Casais Mezquida  

Computación de Altas Prestaciones 24/25  
Máster en Ingeniería Informática, Universidad Carlos III de Madrid

## 0. Setup

### 0.1. Setup Pyspark

In [5]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


spark = SparkSession.builder.master('local[2]').getOrCreate()
#spark.conf.set('spark.cores.max', '1')
#spark.conf.set('spark.driver.memory','1000M')

sc = spark.sparkContext

spark

!pip install psutil
from psutil import virtual_memory
print(f"Your runtime has {virtual_memory().total / 1e9:.1f} GiB of RAM, {spark.sparkContext.defaultParallelism} cores")

Your runtime has 8.2 GiB of RAM, 2 cores


### 0.2. Setup the data

#### 0.2.1. Download data

In [None]:
!wget https://aulaglobal.uc3m.es/pluginfile.php/7354849/mod_assign/introattachment/0/tripdata_2017-01.csv?forcedownload=1 -o tripdata_2017-01.csv
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv -O data/taxi_zone_lookup.csv
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-07.parquet -O data/yellow_tripdata_2024-07.parquet

#### 0.2.2. Load data

In [18]:
zone_map = spark.read.format("csv").option("inferSchema", "true").option("header", "true").option("mode", "DROPMALFORMED").load("data/taxi_zone_lookup.csv")
zone_map.show()

zone_map: dict[int, list[str]] = zone_map.toPandas().set_index('LocationID').T.to_dict('list')

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [12]:
df = spark.read.format("csv").option("inferSchema", "true").option("timestampFormat","yyyy-MM-dd HH:mm:ss").option("header", "true").option("mode", "DROPMALFORMED").load("data/tripdata_2017-01.csv")
df.printSchema()
df.show(5)


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+-----------------

## 1. Data preprocessing
- Treat nulls
- Convert datatimes to unix timestamps ([`.unix_timestamp()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.unix_timestamp.html#pyspark.sql.functions.unix_timestamp))
- Eliminate cases where the start and end times are the same (difference is 0)