# 🚖 From Dirty to Trusted: NYC Taxi Data Validation with SparkDQ

In this notebook, we explore how to turn messy, real-world data into trustworthy input for machine learning — using [**SparkDQ**](https://github.com/sparkdq-community/sparkdq), a modern, extensible data quality framework built for PySpark.

We’ll validate a public dataset from the NYC Taxi & Limousine Commission, clean it with declarative quality rules, and demonstrate how unchecked records can distort ML models.

**What you'll learn:**
- How to apply rule-based validation to large Spark DataFrames
- How to isolate valid vs. invalid data records
- How bad data affects model performance
- How SparkDQ gives you full control over the quality in your data pipeline

Let's dive in. 🧪

## Download NYC Yellow Taxi Dataset

To follow this demo, we’ll use a real-world dataset from the NYC Taxi & Limousine Commission.  
This dataset contains detailed records of yellow taxi rides in January 2025 — including timestamps, distances, and fares.

The following cell will create a `data/` directory and download the Parquet file into it using `curl`.

In [1]:
!mkdir -p data
!curl -L -o data/yellow_tripdata_2025.parquet https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 56.4M  100 56.4M    0     0  1342k      0  0:00:43  0:00:43 --:--:--  761k


## The Messiness of Real-World Taxi Data

The NYC Yellow Taxi dataset is massive and publicly available — but like most real-world data, it’s far from clean.

Here are some of the common issues you’ll encounter:

- ❌ Missing values in `pickup_datetime` or `dropoff_datetime`  
- ❌ Negative or zero `trip_distance`  
- ❌ Negative or zero `fare_amount`  
- ⏱️ Timestamps that don’t make sense (e.g., dropoff before pickup)  
- ⚠️ Unexpected data types or nulls in `passenger_count`

These aren’t just minor problems — they can **break downstream logic**, **distort features**, or even lead your ML model to learn from garbage.


## 📂 Load NYC Taxi Dataset

We now load the Parquet-formatted Yellow Taxi dataset for January 2023.  
This data includes timestamps, trip distance, fare amounts, and other ride metadata.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("./data/")
df = df.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "trip_distance",
    "fare_amount",
    "passenger_count"
)
df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/25 18:40:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+--------------------+---------------------+-------------+-----------+---------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|passenger_count|
+--------------------+---------------------+-------------+-----------+---------------+
| 2025-01-01 00:18:38|  2025-01-01 00:26:59|          1.6|       10.0|              1|
| 2025-01-01 00:32:40|  2025-01-01 00:35:13|          0.5|        5.1|              1|
| 2025-01-01 00:44:04|  2025-01-01 00:46:01|          0.6|        5.1|              1|
| 2025-01-01 00:14:27|  2025-01-01 00:20:01|         0.52|        7.2|              3|
| 2025-01-01 00:21:34|  2025-01-01 00:25:06|         0.66|        5.8|              3|
+--------------------+---------------------+-------------+-----------+---------------+
only showing top 5 rows



## Install SparkDQ from PyPI

To get started, we'll install the `sparkdq` package directly from [PyPI](https://pypi.org/project/sparkdq/).  
This gives us access to all core validation features — including row- and aggregate-level checks, declarative configs, and result routing.

You can install it using the following command:

In [None]:
!python -m ensurepip
!python -m pip install --quiet sparkdq pyyaml

Ensure that ``SparkDQ`` is installed

In [4]:
import sparkdq
sparkdq.__version__

'0.7.1'

## 📜 Declarative Data Quality Configuration (YAML)

Instead of defining checks directly in Python, SparkDQ also supports loading them from external configuration files.  
This allows for better separation of logic and makes it easy to manage or reuse validations across different pipelines.

In [5]:
import yaml
from sparkdq.management import CheckSet

with open("dq_checks.yaml") as f:
    config = yaml.safe_load(f)

check_set = CheckSet()
check_set.add_checks_from_dicts(config)
print(check_set)

CheckSet:
  - pickup-null (NullCheck)
  - dropoff-null (NullCheck)
  - trip-positive (NumericMinCheck)
  - fare-positive (NumericMinCheck)
  - chronological (ColumnLessThanCheck)


## ✅ Run Validation with SparkDQ

Now that the checks are defined and the data is loaded, we can initialize the SparkDQ engine and run the validation.

The engine will apply all configured checks to the input DataFrame and return a structured result, which can be used to filter passing and failing records, inspect errors, or calculate summary statistics.

In [6]:
from sparkdq.engine import BatchDQEngine
engine = BatchDQEngine(check_set)
validation_result = engine.run_batch(df)

## 📊 Inspect Validation Summary

Once the checks are applied, we can print the validation summary to get a high-level view of the data quality.

In this case, the reported `pass_rate` is **0.94**, meaning that **6% of all records failed at least one critical check**.

That may not sound like much — but with millions of records, it represents a significant volume of problematic data that could distort downstream metrics or lead your ML models to learn from invalid inputs.

By making this visible, SparkDQ gives you the confidence to trust what moves forward — and to understand what gets filtered out.

In [7]:
print(validation_result.summary())



Validation Summary (2025-05-25 18:40:34)
Total records:   3,475,226
Passed records:  3,252,514
Failed records:  222,712
Pass rate:       94.00%


                                                                                

## ❌ Inspect Failed Records and Their Validation Errors

Let’s now take a closer look at the rows that didn’t pass validation.

Each failed record includes a special `_dq_errors` column, which lists all the checks that were violated for that row.  
This makes it easy to understand **why a record was rejected** and which rules were responsible.

By inspecting these errors, we can spot recurring data quality issues, debug upstream problems, or even build monitoring around specific check failures.

In [8]:
bad_data = validation_result.fail_df()
bad_data.select("trip_distance", "fare_amount", "_dq_errors").show(5, False)

+-------------+-----------+--------------------------------------------------------------------------------------------+
|trip_distance|fare_amount|_dq_errors                                                                                  |
+-------------+-----------+--------------------------------------------------------------------------------------------+
|0.71         |-7.2       |[{NumericMinCheck, fare-positive, critical}]                                                |
|0.69         |-6.5       |[{NumericMinCheck, fare-positive, critical}]                                                |
|0.0          |20.06      |[{NumericMinCheck, trip-positive, critical}, {ColumnLessThanCheck, chronological, critical}]|
|0.97         |-16.3      |[{NumericMinCheck, fare-positive, critical}]                                                |
|1.42         |-12.1      |[{NumericMinCheck, fare-positive, critical}]                                                |
+-------------+-----------+-----

This feedback loop is valuable not only for debugging, but also for improving upstream data collection or building alerts.

## ✅ Extract Passing Records Only

After validation, SparkDQ allows you to easily extract only the records that passed all critical checks.  
This lets you continue your data pipeline with full confidence that downstream steps — such as transformations, reporting, or machine learning — are based on **clean and trusted data**.

With just one line, you can access the passing subset:

In [9]:
good_data = validation_result.pass_df()
good_data.show(5, False)

+--------------------+---------------------+-------------+-----------+---------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|passenger_count|
+--------------------+---------------------+-------------+-----------+---------------+
|2025-01-01 00:18:38 |2025-01-01 00:26:59  |1.6          |10.0       |1              |
|2025-01-01 00:32:40 |2025-01-01 00:35:13  |0.5          |5.1        |1              |
|2025-01-01 00:44:04 |2025-01-01 00:46:01  |0.6          |5.1        |1              |
|2025-01-01 00:14:27 |2025-01-01 00:20:01  |0.52         |7.2        |3              |
|2025-01-01 00:21:34 |2025-01-01 00:25:06  |0.66         |5.8        |3              |
+--------------------+---------------------+-------------+-----------+---------------+
only showing top 5 rows



## 🧾 Conclusion

In this demo, we explored how even a simple real-world dataset — like NYC Yellow Taxi rides — can benefit from structured, automated validation using **SparkDQ**.

We saw that:

- ✅ Checks can be defined declaratively via clean YAML configurations  
- 🔍 Passing records can be easily extracted for safe downstream processing  
- ❌ Failing records are clearly annotated and can be further analyzed or corrected

With SparkDQ, you gain full control over data quality enforcement:  
You can route only validated records into your machine learning pipeline — ensuring that **models are trained only on trustworthy data**.

Invalid records don't vanish silently. Instead, they become visible, traceable, and improvable — which allows you to build smarter, more resilient data workflows.

Whether you're validating input data, cleaning for analytics, or building ML features:  
**SparkDQ helps make quality a first-class citizen in every PySpark pipeline.**