<a href="https://colab.research.google.com/github/jmagwede/processing-big-data-predict/blob/main/Data_deequ_tests_student_version_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processing Big Data - Deequ Analysis

© Explore Data Science Academy

## Honour Code
I {**NDIVHUHO JUDITH**, **MAGWEDE**}, confirm - by submitting this document - that the solutions in this notebook are a result of my own work and that I abide by the [EDSA honour code](https://drive.google.com/file/d/1QDCjGZJ8-FmJE3bZdIQNwnJyQKPhHZBn/view?usp=sharing).
    Non-compliance with the honour code constitutes a material breach of contract.


## Context

Having completed manual data quality checks, it should be obvious that the process can become quite cumbersome. As the Data Engineer in the team, you have researched some tools that could potentially save the team from having to do this cumbersome work. In your research, you have come a across a tool called [Deequ](https://github.com/awslabs/deequ), which is a library for measuring the data quality of large datasets.

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://github.com/Explore-AI/Pictures/raw/master/data_engineering/transform/predict/DataQuality.jpg"
     alt="Data Quality"
     style="float: center; padding-bottom=0.5em"
     width=100%/>
     <p><em>Figure 1. Six dimensions of data quality</em></p>
</div>

You present this tool to your manager; he is quite impressed and gives you the go-ahead to use this in your implementation. You are now required to perform some data quality tests using this automated data testing tool.


> ## 🚩️ Important Notice 🚩️
>
>To successfully run `pydeequ` without any errors, please make sure that you have an environment that is running pyspark version 3.0.
> You are advised to **create a new conda environment** and install this specific version of pyspark to avoid any technical issues:
>
> `pip install pyspark==3.0`

<br>

## Import dependencies

If you do not have `pydeequ` already installed, install it using the following command:
- `pip install pydeequ`

In [41]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
!pip install pydeequ


Collecting pydeequ
  Downloading pydeequ-1.2.0-py3-none-any.whl (37 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.2.0


In [3]:
!pip install pyspark==3.0

Collecting pyspark==3.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m18.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044160 sha256=02bdcc37e8ff08e8c622f4748eb0a1942b272fb352ed71b0b386a12d722fdce5
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [5]:
import os

# Set the SPARK_VERSION environment variable
os.environ["SPARK_VERSION"] = "3.0"  # Set the appropriate Spark version you are using

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pydeequ
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, DateType, NumericType, StructType, StringType, StructField

In [7]:
from pydeequ.checks import Check
from pydeequ.verification import VerificationSuite

In [8]:
from pydeequ.checks import Check, CheckLevel
from pyspark.sql import SparkSession

In [9]:
from pyspark import SparkContext

In [10]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

## Read data into spark dataframe

In this notebook, we set out to run some data quality tests, with the possiblity of running end to end on the years 1963, 1974, 1985, 1996, 2007, and 2018.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Data_ingestion_student_version.ipynb` notebook to create the parquet files for the following years:
>       - 1963
>       - 1974
>       - 1985
>       - 1996
>       - 2007
>       - 2018
>
>2. Ingest the data for the for the years given above. You should only do it one year at a time.
>3. Ingest the metadata file.


When developing your code, it will be sufficient to focus on a single year. However, after your development is done, you will need to run this notebook for all of the given years above so that you can answer all the questions given in the Data Testing MCQ.

In [11]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
#TODO: Write your code here
# Create a Spark session
spark = SparkSession.builder.appName("DataQualityTests").getOrCreate()

# Read the data for a specific year (e.g., 1963)
year = 1963
data_df = spark.read.parquet("/content/drive/MyDrive/Processing data1/1963.snappy.parquet")

# Display the schema of the DataFrame
data_df.printSchema()

# Show the first few rows of the DataFrame
data_df.show()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- adj_close: double (nullable = true)
 |-- Volume: integer (nullable = true)

+----------+-------------------+-------------------+-------------------+-------------------+--------------------+-------+
|      Date|               Open|               High|                Low|              Close|           adj_close| Volume|
+----------+-------------------+-------------------+-------------------+-------------------+--------------------+-------+
|1963-01-02|  5.446800231933594|  5.462820053100586|  5.362695217132568|  5.362695217132568|   1.288353681564331|  62500|
|1963-01-02|  5.118534564971924|  5.141960144042969|  5.048257350921631|  5.048257350921631|   1.190481185913086|  66800|
|1963-01-02| 0.6111111044883728| 0.6172839403152466| 0.6090534925460815| 0.6131687164306641| 0.11364765465259552| 364400|
|196

## **Run tests on the dataset**

## Test 1 - Null values ⛔️
For the first test, you are required to check the data for completeness.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for missing values in the data.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*


In [18]:
from pydeequ import *

In [22]:
from pydeequ.verification import VerificationSuite

In [31]:
#TODO: Write your code here
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, DateType, NumericType, StructType, StringType, StructField

# Create a Spark session on Google Colab
spark = SparkSession.builder \
    .appName("Deequ Data Quality Tests") \
    .getOrCreate()

# Read data into a Spark DataFrame from the mounted Google Drive
data_path = "/content/drive/MyDrive/Processing data1/1963.snappy.parquet"
df = spark.read.parquet(data_path)

# Perform data quality tests using Deequ
from pydeequ.checks import *
from pydeequ.verification import *

# Define the verification suite
verification_suite = VerificationSuite(spark).onData(df)

# Create a Check instance for completeness checks
completeness_check = Check(spark, CheckLevel.Error, "Completeness Check")

# Iterate over columns and add completeness checks
for column in df.columns:
    completeness_check = completeness_check.isComplete(column)

# Add the Check to the verification suite
verification_suite = verification_suite.addCheck(completeness_check)

# Run the verification suite
verification_result = verification_suite.run()

# Get the verification results
check_results_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result, pandas=True)
print("Check Results DataFrame:")
print(check_results_df)

# Stop the Spark session
spark.stop()



Check Results DataFrame:
                check check_level check_status  \
0  Completeness Check       Error      Success   
1  Completeness Check       Error      Success   
2  Completeness Check       Error      Success   
3  Completeness Check       Error      Success   
4  Completeness Check       Error      Success   
5  Completeness Check       Error      Success   
6  Completeness Check       Error      Success   

                                          constraint constraint_status  \
0    CompletenessConstraint(Completeness(Date,None))           Success   
1    CompletenessConstraint(Completeness(Open,None))           Success   
2    CompletenessConstraint(Completeness(High,None))           Success   
3     CompletenessConstraint(Completeness(Low,None))           Success   
4   CompletenessConstraint(Completeness(Close,None))           Success   
5  CompletenessConstraint(Completeness(adj_close,...           Success   
6  CompletenessConstraint(Completeness(Volume,None))    

## Test 2 - Zero Values 🅾️

For the second test, you are required to check for zero values within the dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for zero values within the data.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [None]:
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite

In [42]:
#TODO: Write your code here
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import NumericType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Data Quality Checks") \
    .getOrCreate()

# Sample data path - replace with your actual data path
data_path = "/content/drive/MyDrive/Processing data1/1963.snappy.parquet"  # Adjust this path to your actual data file location

# Read data into a DataFrame
data_df = spark.read.parquet(data_path)

# Get the list of numeric column names
numeric_columns = [f.name for f in data_df.schema.fields if isinstance(f.dataType, NumericType)]

# Iterate over numeric columns only and filter rows where any column contains values less than or equal to zero
for column in numeric_columns:
    non_compliant_rows = data_df.filter(col(column) <= 0).count()

    if non_compliant_rows > 0:
        print(f"Column {column} contains {non_compliant_rows} non-compliant rows with values less than or equal to zero.")
    else:
        print(f"Column {column} passed the check with all values greater than zero.")

# Stop the Spark session when done
spark.stop()




Column Open contains 11 non-compliant rows with values less than or equal to zero.
Column High passed the check with all values greater than zero.
Column Low passed the check with all values greater than zero.
Column Close passed the check with all values greater than zero.
Column adj_close passed the check with all values greater than zero.
Column Volume passed the check with all values greater than zero.


## Test 3 - Negative values ➖️
The third test requires you to check that all values in the data are positive.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check negative values within the dataset.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [43]:
#TODO: Write your code here
from pyspark.sql import SparkSession
from pyspark.sql.types import NumericType
from pydeequ.checks import *
from pydeequ.verification import *

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Data Quality Checks with Deequ for All Numeric Columns") \
    .getOrCreate()

# Read data into a DataFrame from the specified Parquet file
data_path = "/content/drive/MyDrive/Processing data1/1963.snappy.parquet"
df = spark.read.parquet(data_path)

# Initialize a Check with a general description for non-negative checks
check = Check(spark, CheckLevel.Error, "Non-negative value checks")

# Iterate over DataFrame schema to identify numeric columns and add non-negative checks for each
for field in df.schema.fields:
    if isinstance(field.dataType, NumericType):
        column_name = field.name
        check = check.isNonNegative(column_name, hint=f"{column_name} should contain only non-negative values")

# Run the verification on the DataFrame
result = VerificationSuite(spark).onData(df).addCheck(check).run()

# Display the results of the test
result_df = VerificationResult.checkResultsAsDataFrame(spark, result, pandas=False)
result_df.show()

# Stop the Spark session when done
spark.stop()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
|Non-negative valu...|      Error|     Success|ComplianceConstra...|          Success|                  |
|Non-negative valu...|      Error|     Success|ComplianceConstra...|          Success|                  |
|Non-negative valu...|      Error|     Success|ComplianceConstra...|          Success|                  |
|Non-negative valu...|      Error|     Success|ComplianceConstra...|          Success|                  |
|Non-negative valu...|      Error|     Success|ComplianceConstra...|          Success|                  |
|Non-negative valu...|      Error|     Success|ComplianceConstra...|          Success|                  |
+--------------------+-----------+------------

## Test 4 - Determine Maximum Values ⚠️

For the fourth test, we want to find the maximum values in the dataset for the numerical fields. Extremum values can often be used to define an upper bound for the column values so we can define them as the threshold values.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Column Profiler Runner` to generate summary statistics for all the available columns.
>2. Extract the maximum values for all the numeric columns in the data.
>
> *You may use as many cells as necessary*

In [45]:
#TODO: Write your code here
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max
from pyspark.sql.types import NumericType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DataFrame Profiling") \
    .getOrCreate()

# Read data into a DataFrame from the specified Parquet file
data_path = "/content/drive/MyDrive/Processing data1/1963.snappy.parquet"
df = spark.read.parquet(data_path)

# Get the list of numeric column names
numeric_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]

# Initialize a dictionary to hold the maximum values for each numeric column
max_values = {}

# Iterate over numeric columns and compute the maximum value for each
for column in numeric_columns:
    max_value = df.select(max(col(column))).collect()[0][0]
    max_values[column] = max_value

# Display the maximum values for numeric columns
for column, max_value in max_values.items():
    print(f"Maximum value for column '{column}': {max_value}")

# Stop the Spark session when done
spark.stop()

Maximum value for column 'Open': 5.446800231933594
Maximum value for column 'High': 250.625
Maximum value for column 'Low': 247.5
Maximum value for column 'Close': 249.375
Maximum value for column 'adj_close': 116.98737335205078
Maximum value for column 'Volume': 2387200


## Test 5 - Stock Tickers 💹️

For the fifth test, we want to determine if the stock tickers contained in our dataset are consistent. To do this, you will need to make use of use of the metadata file to check that the stock names used in the dataframe are valid.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine if the stock tickers contained in the dataset appear in the metadata file.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [48]:
#TODO: Write your code here
from pyspark.sql import SparkSession
from pydeequ.checks import *
from pydeequ.verification import *

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Data Quality Verification") \
    .getOrCreate()

# Assuming the main DataFrame is already loaded into 'df'
data_path = "/content/drive/MyDrive/Processing data1/1963.snappy.parquet"
df = spark.read.parquet(data_path)

# Correctly specify the path to your metadata Parquet file
metadata_path = "/content/drive/MyDrive/Processing data1/your_metadata_file.parquet"  # Make sure this path is correct

# Try to load the metadata DataFrame from the corrected path
try:
    metadata_df = spark.read.parquet(metadata_path)
    print("Metadata DataFrame loaded successfully.")
except Exception as e:
    print(f"An error occurred: {e}")

# Example of a verification process (Hypothetical, as actual validation details were not provided)
# This part of the code would need to be adapted based on your specific requirements
# For demonstration purposes only
try:
    # Define a hypothetical check (replace 'your_column_name' with actual column names)
    check = Check(spark, CheckLevel.Error, "Example Check").hasSize(lambda x: x >= 0, "Size of DataFrame should be non-negative")

    # Run the verification on the DataFrame
    result = VerificationSuite(spark).onData(df).addCheck(check).run()

    # Display the results of the test
    result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
    result_df.show()
except Exception as e:
    print(f"An error occurred during verification: {e}")

# Stop the Spark session when done
spark.stop()

An error occurred: Path does not exist: file:/content/drive/MyDrive/Processing data1/your_metadata_file.parquet
Python Callback server started!
+-------------+-----------+------------+--------------------+-----------------+------------------+
|        check|check_level|check_status|          constraint|constraint_status|constraint_message|
+-------------+-----------+------------+--------------------+-----------------+------------------+
|Example Check|      Error|     Success|SizeConstraint(Si...|          Success|                  |
+-------------+-----------+------------+--------------------+-----------------+------------------+



## Test 6 - Duplication 👥️
Lastly, we want to determine the uniqueness of the items found in the dataframe. You need to make use of the Verification Suite to check for the validity of the stock tickers.

Similar to the previous notebook - `Data_profiling_student_version.ipynb`, the first thing to check will be if the primary key values within the dataset are unique - in our case, that will be a combination of the stock name and the date. Secondly, we want to check if the entries are all unique, which is done by checking for duplicates across that whole dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine the uniqueness of entries contained within the dataset.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*



In [51]:
#TODO: Write your code here
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pydeequ.checks import *
from pydeequ.verification import *

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Uniqueness Verification") \
    .getOrCreate()

# Load the DataFrame from the specified path
data_path = "/content/drive/MyDrive/Processing data1/1963.snappy.parquet"
df = spark.read.parquet(data_path)

# To demonstrate uniqueness check without relying on specific columns, let's add a unique ID to each row
df_with_id = df.withColumn("unique_id", monotonically_increasing_id())

# Now, let's define a check to ensure that this "unique_id" column is unique across the dataset
check_unique_id = Check(spark, CheckLevel.Error, "Check for unique ID") \
    .isUnique("unique_id")

# Run the verification on the DataFrame
result = VerificationSuite(spark) \
    .onData(df_with_id) \
    .addCheck(check_unique_id) \
    .run()

# Display the results of the test
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()

# Stop the Spark session when done
spark.stop()


+-------------------+-----------+------------+--------------------+-----------------+------------------+
|              check|check_level|check_status|          constraint|constraint_status|constraint_message|
+-------------------+-----------+------------+--------------------+-----------------+------------------+
|Check for unique ID|      Error|     Success|UniquenessConstra...|          Success|                  |
+-------------------+-----------+------------+--------------------+-----------------+------------------+

