### Exercise 1 — CSV ↔ Parquet Conversion & Basic EDA
- Dataset: NYC Yellow Taxi 2023‑01 CSV  https://d37ci6vzurychx.cloudfront.net/tripdata/yellow_tripdata_2023-01.csv.gz
- Load first 500 000 rows with Pandas for schema inspection and summary statistics
(describe()).
- Write the full file to Parquet with Snappy compression via Pandas (PyArrow backend).
- Start a local Spark session, read the Parquet back, cache it, and verify matching row
counts

In [1]:
import pyarrow as pa
import pandas as pd
import os 

url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'

sample = pd.read_parquet(url, engine='pyarrow').head(5_000_000)

print(sample.dtypes)
print(sample.describe(include='all'))

os.makedirs('./data', exist_ok=True)

table = pa.Table.from_pandas(sample)
pa.parquet.write_table(table, './data/yellow_tripdata_2023-01.snappy.parquet', compression='snappy')

VendorID                          int64
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
airport_fee                     float64
dtype: object
            VendorID        tpep_pickup_datetime       tpep_dropoff_datetime  \
count   3.066766e+06                     3066766                     3066766   
unique           NaN                         NaN                  

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Taxi") \
    .master("local[*]") \
    .getOrCreate()
df_spark = spark.read.parquet('data/yellow_tripdata_2023-01.snappy.parquet')


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/10 12:01:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
import time

start_spark_count = time.time()
spark_count = df_spark.count()
spark_count_time = time.time() - start_spark_count
print(f"Tempo de count() SEM cache: {spark_count_time:.2f} segundos")

start_cache = time.time()
df_spark.cache()  
cached_count = df_spark.count() 
cache_time = time.time() - start_cache
print(f"Tempo de caching (incluindo 1º count()): {cache_time:.2f} segundos")

start_cached_count = time.time()
spark_count_cached = df_spark.count()
cached_count_time = time.time() - start_cached_count
print(f"Tempo de count() COM cache: {cached_count_time:.2f} segundos")

sample = pd.read_parquet('data/yellow_tripdata_2023-01.snappy.parquet').head(500)

start_pandas = time.time()
pandas_count = len(sample)
pandas_time = time.time() - start_pandas
print(f"\nTempo de len() no Pandas: {pandas_time:.4f} segundos")

print("\nResumo:")
print(f"Spark (sem cache): {spark_count_time:.2f} segundos")
print(f"Spark (com cache): {cached_count_time:.2f} segundos")
print(f"Pandas: {pandas_time:.4f} segundos")
print(f"Contagem Spark: {spark_count}")
print(f"Contagem Pandas: {pandas_count}")

print("\nStatus do cache (Spark):")
print(df_spark.storageLevel) 

Tempo de count() SEM cache: 1.57 segundos


                                                                                

Tempo de caching (incluindo 1º count()): 5.55 segundos
Tempo de count() COM cache: 0.19 segundos

Tempo de len() no Pandas: 0.0001 segundos

Resumo:
Spark (sem cache): 1.57 segundos
Spark (com cache): 0.19 segundos
Pandas: 0.0001 segundos
Contagem Spark: 3066766
Contagem Pandas: 500

Status do cache (Spark):
Disk Memory Deserialized 1x Replicated


Exercise 2 — Pandas Profiling vs. Spark SQL Analysis
- Generate a Pandas Profiling (ydata‑profiling) report on the 500 k taxi sample.
- Recreate three key insights in Spark SQL (e.g., mean trip distance, 95‑th percentile fare).
- Compare runtimes and memory; justify Pandas vs. Spark choices for typical HCIE workloads

In [4]:
from ydata_profiling import ProfileReport

profile = ProfileReport(sample, title="NYC Taxi Sample Profiling", explorative=True)
profile.to_file("./data/nyc_taxi_sample_report.html")

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 19/19 [00:00<00:00, 166.00it/s]


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

In [5]:
view_spark = df_spark.createOrReplaceTempView("taxi_data")

spark.sql("SELECT AVG(trip_distance) AS mean_trip_distance \
    FROM taxi_data").show()

+------------------+
|mean_trip_distance|
+------------------+
| 3.847342030660236|
+------------------+



In [6]:
spark.sql("SELECT percentile_approx(fare_amount, 0.95) AS fare_95th_percentile \
    FROM taxi_data").show()



+--------------------+
|fare_95th_percentile|
+--------------------+
|                65.3|
+--------------------+



                                                                                

In [7]:
spark.sql("SELECT PULocationID, COUNT(*) AS trip_count \
            FROM taxi_data \
            GROUP BY PULocationID \
            ORDER BY trip_count DESC \
            LIMIT 5").show()

+------------+----------+
|PULocationID|trip_count|
+------------+----------+
|         132|    160030|
|         237|    148074|
|         236|    138391|
|         161|    135417|
|         186|    109227|
+------------+----------+



### Exercise 3 — Data Cleaning Pipeline with Pandas UDFs
- Create a Spark DataFrame of the full taxi dataset.
- Implement a scalar Pandas UDF that buckets trip_distance into quartiles(Q1–Q4).
- Assemble an ML Pipeline: Bucketizer (UDF output), StringIndexer(payment_type), VectorAssembler.
- Save the clean DataFrame as an HCFS table for future exercises.

In [30]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

@pandas_udf(StringType())
def bucket_trip_distance(trip_distances: pd.Series) -> pd.Series:
    q1 = trip_distances.quantile(0.25)
    q2 = trip_distances.quantile(0.50)
    q3 = trip_distances.quantile(0.75)
    return pd.cut(
        trip_distances,
        bins=[-float("inf"), q1, q2, q3, float("inf")],
        labels = ["Q1", "Q2", "Q3", "Q4"]
    )

df = df_spark.withColumn("trip_distance_quartile", bucket_trip_distance(df_spark["trip_distance"]))

df.groupBy("trip_distance_quartile").count().orderBy("trip_distance_quartile").show()



+----------------------+------+
|trip_distance_quartile| count|
+----------------------+------+
|                    Q1|777435|
|                    Q2|762277|
|                    Q3|763202|
|                    Q4|763852|
+----------------------+------+



                                                                                

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

payment_indexer = StringIndexer(
    inputCol="payment_type",
    outputCol="payment_type_index",
    handleInvalid="keep"
)

assembler = VectorAssembler(
    inputCols=["trip_distance_quartile", "payment_type_index"],
    outputCol="features"
)

pipeline = Pipeline(stages=[payment_indexer, assembler])

model = pipeline.fit(df)
df_transformed = model.transform(df)


                                                                                

In [10]:
try:
    df.write.mode("overwrite").saveAsTable("cleaned_taxi_data")
except Exception as e:
    print(f"Table save failed, falling back to directory save: {e}")
    df.write.mode("overwrite").parquet("output/cleaned_taxi_data/")

                                                                                

### Exercise 4 — Basic Regression with MLlib’s LinearRegression
- Predict total_amount using engineered features.
- Train/test split 80/20; evaluate RMSE.
- Collect 1 % of predictions to Pandas, plot residuals with Matplotlib, and detect heteroscedasticity.

In [22]:
print(type(df))
df.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- trip_distance_quartile: integer (nullable = true)



In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

feature_cols = [
    'trip_distance',
    'passenger_count',
    'payment_type',
    'PULocationID',
    'DOLocationID',
    'RatecodeID'
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features',handleInvalid="skip")
df_prepared = assembler.transform(df)
train_data, test_data = df_prepared.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol='features', labelCol='total_amount')
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol='total_amount', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

sample_predictions = predictions.sample(fraction=0.01, seed=42)

sample_pd = sample_predictions.select('prediction', 'total_amount').toPandas()
sample_pd['residuals'] = sample_pd['total_amount'] - sample_pd['prediction']
plt.figure(figsize=(10, 6))
plt.scatter(sample_pd['prediction'], sample_pd['residuals'], alpha=0.5)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('Predicted Total Amount')
plt.ylabel('Residuals')
plt.title('Residuals vs Predicted Total Amount')
plt.show()

25/05/10 12:23:20 WARN Instrumentation: [9194deaf] regParam is zero, which might cause numerical instability and overfitting.
25/05/10 12:24:15 WARN MemoryStore: Not enough space to cache rdd_13_0 in memory! (computed 118.2 MiB so far)
                                                                                

Root Mean Squared Error (RMSE) on test data = 21.598327934751072


  plt.show()


In [23]:
print(sample_pd['total_amount'].max())
print(sample_pd['total_amount'].min())
coefficients = lr_model.coefficients
intercept = lr_model.intercept

print(f"Intercepto: {intercept}")
print(f"Coeficientes: {coefficients}")

for feature, coef in zip(feature_cols, coefficients):
    print(f"{feature}: {coef}")

303.5
-74.0
Intercepto: 44.24210836689825
Coeficientes: [0.03992139905793312,0.7897287066271028,-6.96582345641627,-0.03575762205170507,-0.02600751345190344,0.17585689457135667]
trip_distance: 0.03992139905793312
passenger_count: 0.7897287066271028
payment_type: -6.96582345641627
PULocationID: -0.03575762205170507
DOLocationID: -0.02600751345190344
RatecodeID: 0.17585689457135667


A equação de regressão pode ser expressa como:

$$
\text{total\_amount} = \beta_0 + \beta_1 \times \text{trip\_distance} + \beta_2 \times \text{fare\_amount} + \cdots + \epsilon
$$

Onde:
- $\beta_0$ é o intercepto
- $\beta_1$, $\beta_2$ são os coeficientes
- $\epsilon$ representa o erro

### Exercise 5 — Hyper‑Parameter Tuning with CrossValidator
- Grid‑search regParam ∈ {0.01, 0.1, 0.5} and elasticNetParam ∈ {0, 0.5, 1}.
- Use 3‑fold CV and RegressionEvaluator (metric = r2).
- Export cvModel.avgMetrics to Pandas and build a Seaborn barplot of R² vs. hyper‑parameters.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

lr = LinearRegression(featuresCol='features', labelCol='total_amount')


evaluator = RegressionEvaluator(
    labelCol='total_amount',
    predictionCol='prediction',
    metricName='r2'
)

# paramGrid = ParamGridBuilder() \
#     .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
#     .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
#     .build()
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.5]) \
    .addGrid(lr.elasticNetParam, [1.0]) \
    .build()

from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3)

cvModel = cv.fit(train_data)

25/05/10 12:43:27 WARN MemoryStore: Not enough space to cache rdd_265_1 in memory! (computed 38.1 MiB so far)
25/05/10 12:43:27 WARN BlockManager: Persisting block rdd_265_1 to disk instead.
25/05/10 12:43:27 WARN MemoryStore: Not enough space to cache rdd_265_3 in memory! (computed 73.8 MiB so far)
25/05/10 12:43:27 WARN BlockManager: Persisting block rdd_265_3 to disk instead.
25/05/10 12:43:27 WARN MemoryStore: Not enough space to cache rdd_265_0 in memory! (computed 73.9 MiB so far)
25/05/10 12:43:27 WARN BlockManager: Persisting block rdd_265_0 to disk instead.
                                                                                

In [40]:
import seaborn as sns
import matplotlib.pyplot as plt

avg_metrics = cvModel.avgMetrics
best_model = cvModel.bestModel

best_reg_param = best_model._java_obj.getRegParam()
best_elastic_net_param = best_model._java_obj.getElasticNetParam()

print(f"Melhor regParam: {best_reg_param}")
print(f"Melhor elasticNetParam: {best_elastic_net_param}")

metrics_data = []
for params, r2 in zip(paramGrid, avg_metrics):
    reg = params[lr.regParam]
    elastic = params[lr.elasticNetParam]
    metrics_data.append((reg, elastic, r2))

df_metrics = pd.DataFrame(metrics_data, columns=['regParam', 'elasticNetParam', 'r2'])

plt.figure(figsize=(10, 6))
sns.barplot(data=df_metrics, x='regParam', y='r2', hue='elasticNetParam')
plt.title('R² por Combinação de Hiperparâmetros')
plt.xlabel('regParam')
plt.ylabel('R²')
plt.legend(title='elasticNetParam')
plt.tight_layout()
plt.show()


Melhor regParam: 0.5
Melhor elasticNetParam: 1.0


  plt.show()


### Exercise 6 — Classification with RandomForestClassifier
- Define binary label: high‑tip vs. low‑tip (tip ≥ 15 % of fare).
- Train RandomForestClassifier with 100 trees; retrieve feature importances.
- Collect importances to Pandas and visualise top‑10 predictors; discuss relevance to feature pruning.

In [None]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "high_tip",
    when(col("tip_amount") / col("fare_amount") >= 0.15, 1).otherwise(0)
)
from pyspark.ml.feature import StringIndexer, VectorAssembler

indexer = StringIndexer(inputCol="payment_type", outputCol="payment_type_indexed")

feature_cols = ['trip_distance', 'passenger_count', 'fare_amount', 'payment_type_indexed']

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

rf = RandomForestClassifier(labelCol="high_tip", featuresCol="features", numTrees=100, seed=42)

pipeline = Pipeline(stages=[indexer, assembler, rf])

model = pipeline.fit(df)
