# Introduction to Data Quality and Validation Frameworks 

Data quality is a critical aspect of data engineering and analytics. Ensuring data is accurate, consistent, and timely helps in making informed decisions based on reliable insights. Data validation frameworks come into play to facilitate the implementation of data quality checks throughout data pipelines. They assist in defining, monitoring, and validating data quality rules, thereby ensuring the reliability and trustworthiness of the data.

## dbt (data build tool) in Data Quality

[dbt (data build tool)](https://www.getdbt.com/) is a popular open-source software tool that enables data analysts and engineers to transform and test data in the data warehouse. dbt allows for defining, documenting, and executing data transformation workflows, making it a powerful tool for data pipeline orchestration. Here's how dbt stands as a vital tool in ensuring data quality:

### 1. Data Transformation

- **SQL-Based Transformations**: dbt leverages the power of SQL for data transformation, allowing for the creation of complex data models with ease.
- **Version Control**: dbt supports version control of data models, enabling tracking of changes and facilitating collaboration among team members.

### 2. Data Testing

- **Built-in Data Tests**: dbt offers a range of built-in tests that can be easily implemented to check data quality, such as testing for uniqueness, not_null, and referential integrity.
- **Custom Data Tests**: Apart from built-in tests, dbt allows for the creation of custom data tests, enabling the definition of business-specific data quality rules.

### 3. Documentation and Data Lineage

- **Automatic Documentation**: dbt automatically generates documentation for the data models, providing a clear view of the data structure and transformations.
- **Data Lineage**: dbt supports the visualization of data lineage, helping in understanding the flow of data and dependencies between different data models.

### 4. Integration with Data Pipelines

- **Automation and Scheduling**: dbt can be integrated into data pipelines for automated execution of data transformations and tests, ensuring data quality checks are performed in each run.
- **Compatibility with Various Data Warehouses**: dbt supports various data warehouses, making it a flexible choice for different data environments.

In the subsequent sections, we will demonstrate how to set up a mock data pipeline, ingest data into a PostgreSQL database, and use dbt to implement data quality checks and validations, showcasing the best practices for incorporating data quality checks in data pipelines.


In [2]:
import pandas as pd

# Load the dataset
file_path = "/Users/malempatiharidines/Code/GitHub/training/python_training/datasets/yellow_tripdata_2024-01.parquet"
data = pd.read_parquet(file_path)

# Display the first few rows of the dataframe
print(data.head())

# Get basic information about the dataset
print(data.info())

# Get descriptive statistics for the numerical columns
print(data.describe())

   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2024-01-01 00:57:55   2024-01-01 01:17:43              1.0   
1         1  2024-01-01 00:03:00   2024-01-01 00:09:36              1.0   
2         1  2024-01-01 00:17:06   2024-01-01 00:35:01              1.0   
3         1  2024-01-01 00:36:38   2024-01-01 00:44:56              1.0   
4         1  2024-01-01 00:46:51   2024-01-01 00:52:57              1.0   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           1.72         1.0                  N           186            79   
1           1.80         1.0                  N           140           236   
2           4.70         1.0                  N           236            79   
3           1.40         1.0                  N            79           211   
4           0.80         1.0                  N           211           148   

   payment_type  fare_amount  extra  mta_tax  tip_amount  tolls_amount  \


### Reading and Profiling the Data

Before we embark on setting up data transformations and quality checks with dbt, it's imperative to understand the dataset we are working with. In this section, we read the NYC Taxi Trip dataset for January 2024 and performed a cursory data profiling to get acquainted with the data structure and contents.

#### Step 1: Reading the Data

Using pandas, a popular data manipulation library in Python, we read the dataset from the parquet file. Here's the snippet of Python code we used to read the data:

```python
import pandas as pd

# Load the dataset
file_path = "file_path/yellow_tripdata_2024-01.parquet"
data = pd.read_parquet(file_path)
```

#### Step 2: Basic Data Profiling

After loading the data, we performed some basic profiling to understand the structure and contents of the dataset. We used the following commands to explore the data:

```python
# Display the first few rows of the dataframe
print(data.head())

# Get basic information about the dataset
print(data.info())

# Get descriptive statistics for the numerical columns
print(data.describe())
```

#### Data Overview

From the data profiling, we observed that the dataset contains 2,964,624 records and 19 columns, including details about the trip (pickup and dropoff times, locations), fare breakdown (amount, tips, tolls), and other attributes like payment type and rate code. 

Understanding the data's structure and contents will guide us in setting up appropriate data transformations and quality checks in the subsequent steps, where we will be using dbt to implement data quality checks and validations.

In the next section, we will proceed to set up dbt and create transformation models to clean and structure the data, preparing it for data quality checks.

### Setting Up dbt (data build tool)

In this section, we will focus on setting up dbt, a popular tool for data transformations and testing in the data warehouse. dbt allows us to define, document, and execute data transformation workflows, making it a powerful tool for setting up data quality checks.

#### Step 1: Installing dbt

Before we can start using dbt, it needs to be installed in your Python environment. You can install dbt using the following command:

```shell
pip install dbt
```

This command will install dbt along with its dependencies, preparing your environment for setting up a dbt project.

#### Step 2: Initializing a dbt Project

Once dbt is installed, the next step is to initialize a dbt project. Navigate to the directory where you want to create your dbt project and run the following command:

```shell
dbt init my_dbt_project
```

This command will create a new dbt project with the necessary directory structure and configuration files to get started with dbt.

#### Step 3: Configuring the dbt Profile

To connect dbt to your PostgreSQL database, you need to configure the dbt profile. The profile configuration file is located at `~/.dbt/profiles.yml`. In this file, you'll set up the connection details for your PostgreSQL database. Here's an example configuration:

```yaml
my_dbt_project:
  target: dev
  outputs:
    dev:
      type: postgres
      host: localhost
      user: your_username
      pass: your_password
      port: 5432
      dbname: your_database_name
      schema: your_schema_name
```

Replace `your_username`, `your_password`, `your_database_name`, and `your_schema_name` with the appropriate details for your PostgreSQL database.

Once the dbt project is set up and configured, we are ready to start creating dbt models for data transformation and setting up data quality checks.

In the next section, we will create dbt models to transform the raw data and set up data quality tests using dbt's testing functionalities.


### Understanding dbt Components

Before we delve into setting up dbt models and data ingestion, let's understand the core components of dbt:

1. **Models**: Models are the central artifacts in dbt. They are essentially SQL queries defined in `.sql` files which represent the transformations that need to be applied to your source data. dbt builds models by executing these SQL queries in the specified materialization format (tables, views, etc.).

2. **Macros**: Macros are reusable pieces of SQL code that can be utilized across different models. They help in avoiding code repetition and can be used to encapsulate logic that can be reused in various models.

3. **Seeds**: Seeds are csv files that store static data, which can be used in transformations or to augment the source data. They are useful for storing data like mapping tables, which do not change over time.

4. **Sources**: Sources in dbt are a way of documenting and testing the raw data in your warehouse. They help in defining the schema of your raw data and can be used to create tests to validate the quality of the source data.

5. **Tests**: Tests in dbt are SQL queries that help in validating the data quality. You can create tests to check for things like uniqueness, not null constraints, etc., in your transformed data.

6. **Snapshots**: Snapshots are used to capture historical changes in your data. They help in tracking how data changes over time by creating a snapshot of the data at regular intervals.

Now, let's move on to the steps we undertook for setting up dbt models and data ingestion.

### Setting Up dbt Models and Data Ingestion

#### Step 1: Setting Up the dbt Model

1. We first created a dbt model to define the structure of the PostgreSQL table where the data will be loaded. The model file, named `nyc_taxi_data.sql`, contains a SQL query that creates an empty table with the desired schema to hold the NYC taxi data.


    ```sql
        {{ config(materialized='table') }}
        
        SELECT 
            NULL::INT AS VendorID,
            NULL::TIMESTAMP AS tpep_pickup_datetime,
            NULL::TIMESTAMP AS tpep_dropoff_datetime,
            NULL::FLOAT AS passenger_count,
            NULL::FLOAT AS trip_distance,
            NULL::FLOAT AS RatecodeID,
            NULL::VARCHAR AS store_and_fwd_flag,
            NULL::INT AS PULocationID,
            NULL::INT AS DOLocationID,
            NULL::INT AS payment_type,
            NULL::FLOAT AS fare_amount,
            NULL::FLOAT AS extra,
            NULL::FLOAT AS mta_tax,
            NULL::FLOAT AS tip_amount,
            NULL::FLOAT AS tolls_amount,
            NULL::FLOAT AS improvement_surcharge,
            NULL::FLOAT AS total_amount,
            NULL::FLOAT AS congestion_surcharge,
            NULL::FLOAT AS Airport_fee
        WHERE FALSE
    ```

#### Step 2: Data Ingestion using Python

1. **Reading the Data**: We started by reading the NYC taxi data (stored in a parquet file) into a pandas DataFrame to explore and understand the structure of the data.

2. **Creating a SQLAlchemy Engine**: We created a SQLAlchemy engine to connect to the PostgreSQL database where the data will be ingested.

3. **Loading Data into PostgreSQL**: To load the data from the DataFrame into the PostgreSQL table, we used the `to_sql` method of pandas. We encountered some issues initially with the method not recognizing the SQLAlchemy engine object correctly. After adjusting the script, we were able to successfully load the data into the PostgreSQL table using the following script:

   ```python
   from sqlalchemy import create_engine
   from tqdm.notebook import tqdm

   # Create a SQLAlchemy engine
   engine = create_engine('postgresql://godzilla:Mrawww@localhost:5437/monsterverse')

   # Convert the DataFrame to a list of dictionaries
   data_dict = data.to_dict('records')

   # Define the INSERT query with parameter placeholders
   query = """INSERT INTO nyc_taxi_data 
              (VendorID, tpep_pickup_datetime, ... , congestion_surcharge, Airport_fee) 
              VALUES 
              (:VendorID, :tpep_pickup_datetime, ... , :congestion_surcharge, :Airport_fee)"""

   # Batch insert data into PostgreSQL
   batch_size = 1000
   batches = [data_dict[i:i + batch_size] for i in range(0, len(data_dict), batch_size)]
   
   # Execute the query with each batch of records in the DataFrame
   for batch in tqdm(batches):
       engine.execute(text(query), batch)
   ```

   Here, we batched the data insert operation to insert multiple rows at a time, making the process more efficient. We also used `tqdm` to display a progress bar during the data ingestion process.

---


In [None]:
from sqlalchemy import create_engine, text
from tqdm import tqdm

# Create a SQLAlchemy engine
engine = create_engine('postgresql://godzilla:Mrawww@localhost:5437/monsterverse')

# Define the INSERT query with parameter placeholders
query = """INSERT INTO nyc_taxi_data 
           (VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, 
            trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, 
            payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, 
            improvement_surcharge, total_amount, congestion_surcharge, Airport_fee) 
           VALUES 
           (:VendorID, :tpep_pickup_datetime, :tpep_dropoff_datetime, :passenger_count, 
            :trip_distance, :RatecodeID, :store_and_fwd_flag, :PULocationID, :DOLocationID, 
            :payment_type, :fare_amount, :extra, :mta_tax, :tip_amount, :tolls_amount, 
            :improvement_surcharge, :total_amount, :congestion_surcharge, :Airport_fee)"""

# Convert the DataFrame to a list of dictionaries
data_dict = data.to_dict('records')

# Determine the batch size
batch_size = 10000
batches = [data_dict[i:i + batch_size] for i in range(0, len(data_dict), batch_size)]

# Execute the query with each batch of records in the DataFrame
for batch in tqdm(batches):
    engine.execute(text(query), batch)

## Setting Up Data Quality Tests with dbt

After successfully ingesting the NYC Taxi data into a PostgreSQL database, the next step in ensuring data reliability is to set up data quality tests. Data quality tests help in verifying that the data meets certain quality standards before it is used in analysis or reporting. In this process, we utilized dbt (Data Build Tool), a popular open-source tool that enables data analysts and engineers to transform and test data using SQL.

### Creating Custom Test Macros

Before we dive into setting up tests in dbt, we created custom test macros. These macros are SQL scripts that define the logic of the data tests. We defined a custom macro to test that values in certain columns (like `fare_amount` and `total_amount`) are greater than zero, which is a basic validation check for our dataset.

```sql
        {% macro test_expression_is_greater_than_zero(model, column_name) %}
        
          select count(*)
          
          from {{ model }}
          
          where {{ column_name }} <= 0
          
        {% endmacro %}
```

### Updating the schema.yml File

Next, we moved on to updating the `schema.yml` file, which is a configuration file that allows us to define various properties and tests for our dbt models. In this file, we specified the columns present in our `nyc_taxi_data` table along with the tests we wanted to run on each column. 

We included various tests such as:
- `not_null`: To check that certain columns do not contain null values.
- `accepted_values`: To verify that values in a column match one of a set of accepted values.
- Custom tests: To ensure that values in columns like `fare_amount` and `total_amount` are greater than zero.

```yml
version: 2

models:
  - name: nyc_taxi_data
    description: "The raw NYC Taxi data ingested into the database"
    columns:
      - name: vendorid
        description: "The unique identifier for the vendor"
        tests:
          - not_null
      - name: tpep_pickup_datetime
        description: "The pickup datetime for the taxi trip"
        tests:
          - not_null
      - name: tpep_dropoff_datetime
        description: "The dropoff datetime for the taxi trip"
        tests:
          - not_null
      - name: passenger_count
        description: "The number of passengers in the taxi trip"
        tests:
          - not_null
          - accepted_values:
              values: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
      - name: trip_distance
        description: "The trip distance of the taxi trip"
        tests:
          - not_null
      - name: pulocationid
        description: "The pickup location ID"
        tests:
          - not_null
      - name: dolocationid
        description: "The dropoff location ID"
        tests:
          - not_null
      - name: fare_amount
        description: "The fare amount for the taxi trip"
        tests:
          - not_null
          - expression_is_greater_than_zero
      - name: total_amount
        description: "The total amount for the taxi trip"
        tests:
          - not_null
          - expression_is_greater_than_zero
```

### Running the Tests

After setting up the `schema.yml` file, we ran the dbt tests using the command `dbt test`. This command checks the data in the database against the tests defined in the `schema.yml` file and returns the results.

### Test Results

The test results indicated that there were records in the dataset where the `total_amount` and `fare_amount` were less than or equal to zero. These tests help in identifying potential data quality issues, which can then be addressed to maintain the reliability and accuracy of the dataset.

### Conclusion

Setting up data quality tests using dbt is a vital step in building a reliable data pipeline. These tests help in identifying and rectifying data issues early in the data pipeline, ensuring that only high-quality data is used in further analysis and reporting.

In the next steps, we would look into rectifying the identified data quality issues and potentially setting up more complex data tests to further ensure the reliability of our dataset.
