# Processing Big Data - Deequ Analysis

© Explore Data Science Academy

## Honour Code
I {OLUSOJI, ONIGBINDE}, confirm - by submitting this document - that the solutions in this notebook are a result of my own work and that I abide by the [EDSA honour code](https://drive.google.com/file/d/1QDCjGZJ8-FmJE3bZdIQNwnJyQKPhHZBn/view?usp=sharing).
    Non-compliance with the honour code constitutes a material breach of contract.


## Context

Having completed manual data quality checks, it should be obvious that the process can become quite cumbersome. As the Data Engineer in the team, you have researched some tools that could potentially save the team from having to do this cumbersome work. In your research, you have come a across a tool called [Deequ](https://github.com/awslabs/deequ), which is a library for measuring the data quality of large datasets.

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://github.com/Explore-AI/Pictures/raw/master/data_engineering/transform/predict/DataQuality.jpg"
     alt="Data Quality"
     style="float: center; padding-bottom=0.5em"
     width=100%/>
     <p><em>Figure 1. Six dimensions of data quality</em></p>
</div>

You present this tool to your manager; he is quite impressed and gives you the go-ahead to use this in your implementation. You are now required to perform some data quality tests using this automated data testing tool.
 

> ## 🚩️ Important Notice 🚩️
>
>To successfully run `pydeequ` without any errors, please make sure that you have an environment that is running pyspark version 3.0.
> You are advised to **create a new conda environment** and install this specific version of pyspark to avoid any technical issues:
>
> `pip install pyspark==3.0`

<br>

## Import dependencies

If you do not have `pydeequ` already installed, install it using the following command:
- `pip install pydeequ`

In [1]:
import os
import pyspark
print(pyspark.__file__)
path = os.path.abspath(pyspark.__file__)
print(path)

C:\Users\hp\anaconda3\envs\deequ\lib\site-packages\pyspark\__init__.py
C:\Users\hp\anaconda3\envs\deequ\lib\site-packages\pyspark\__init__.py


In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\Spark\\spark-3.0.0-bin-hadoop2.7'

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pydeequ
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, DateType, NumericType, StructType, StringType, StructField

In [4]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

In [5]:
spark

## Read data into spark dataframe

In this notebook, we set out to run some data quality tests, with the possiblity of running end to end on the years 1963, 1974, 1985, 1996, 2007, and 2018. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Data_ingestion_student_version.ipynb` notebook to create the parquet files for the following years:
>       - 1963
>       - 1974
>       - 1985
>       - 1996
>       - 2007
>       - 2018
>
>2. Ingest the data for the for the years given above. You should only do it one year at a time.
>3. Ingest the metadata file.


When developing your code, it will be sufficient to focus on a single year. However, after your development is done, you will need to run this notebook for all of the given years above so that you can answer all the questions given in the Data Testing MCQ.

In [6]:
#year = 1963
#yr_1963_df = spark.read.parquet('transformed_data_2018/')

In [7]:
#year = 1974
#yr_1974_df = spark.read.parquet('transformed_data_2018/')

In [8]:
#year = 1985
#yr_1985_df = spark.read.parquet('transformed_data_2018/')

In [9]:
#year = 1996
#yr_1996_df = spark.read.parquet('transformed_data_2018/')

In [10]:
#year = 2007
#yr_2007_df = spark.read.parquet('transformed_data_2018/')

In [11]:
#TODO: Write your code here
# Use this variable (year) to determine which year your are focusing on
year = 2018
yr_2018_df = spark.read.parquet('transformed_data_2018/')

In [12]:
#INSERTED (To get start of dataframe. Take note of the date)
from pyspark.sql.functions import col
yr_2018_df.sort(col('date').asc()).show(5)

+-------------------+-----+-----+-----+-----+---------+---------+-----+
|               date| open| high|  low|close|adj_close|   Volume|stock|
+-------------------+-----+-----+-----+-----+---------+---------+-----+
|2018-01-01 00:00:00| null| null| null| null|     null|     null|  CAH|
|2018-01-02 00:00:00|39.81|40.09| 39.1|39.47|39.179916| 906000.0|  AAN|
|2018-01-02 00:00:00|67.42|67.89|67.34| 67.6| 66.24368|1047800.0|    A|
|2018-01-02 00:00:00|52.33| 53.1| 51.9|52.99|51.647556|4084700.0|  AAL|
|2018-01-02 00:00:00|54.06|55.22|53.91|55.17|    55.17|2928900.0|   AA|
+-------------------+-----+-----+-----+-----+---------+---------+-----+
only showing top 5 rows



In [13]:
#INSERTED (To get end of dataframe. Take note of the date)
from pyspark.sql.functions import col
yr_2018_df.sort(col('date').desc()).show(5)

+-------------------+-----+-----+-----+-----+---------+---------+-----+
|               date| open| high|  low|close|adj_close|   Volume|stock|
+-------------------+-----+-----+-----+-----+---------+---------+-----+
|2018-12-31 00:00:00|66.34|67.48|66.34|67.46| 66.71924|1572100.0|    A|
|2018-12-31 00:00:00|42.29|42.67|40.96|42.05|41.853996| 489200.0|  AAN|
|2018-12-31 00:00:00|26.75|27.07|25.95|26.58|    26.58|2402600.0|   AA|
|2018-12-31 00:00:00|32.09|32.62|31.68|32.11|31.599045|5334700.0|  AAL|
|2018-12-31 00:00:00|39.69|39.69| 28.0|29.69|    29.69|  49800.0| AAMC|
+-------------------+-----+-----+-----+-----+---------+---------+-----+
only showing top 5 rows



In [14]:
#INSERTED (Ingest metadata)
meta = spark.read.csv('symbols_valid_meta.csv', header=True)
meta.toPandas().head()

Unnamed: 0,Nasdaq Traded,Symbol,Security Name,Listing Exchange,Market Category,ETF,Round Lot Size,Test Issue,Financial Status,CQS Symbol,NASDAQ Symbol,NextShares
0,Y,A,"Agilent Technologies, Inc. Common Stock",N,,N,100.0,N,,A,A,N
1,Y,AA,Alcoa Corporation Common Stock,N,,N,100.0,N,,AA,AA,N
2,Y,AAAU,Perth Mint Physical Gold ETF,P,,Y,100.0,N,,AAAU,AAAU,N
3,Y,AACG,ATA Creativity Global - American Depositary Sh...,Q,G,N,100.0,N,N,,AACG,N
4,Y,AADR,AdvisorShares Dorsey Wright ADR ETF,P,,Y,100.0,N,,AADR,AADR,N


## **Run tests on the dataset**

## Test 1 - Null values ⛔️
For the first test, you are required to check the data for completeness.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for missing values in the data. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*


In [15]:
from pyspark.sql.functions import col, when, count

#INSERTED (Count values per column)
total_count = yr_2018_df.select([count(c).alias(c) for c in yr_2018_df.columns])
total_count.show()

+-------+-------+-------+-------+-------+---------+-------+-------+
|   date|   open|   high|    low|  close|adj_close| Volume|  stock|
+-------+-------+-------+-------+-------+---------+-------+-------+
|1311162|1311118|1311118|1311118|1311118|  1311118|1311118|1311162|
+-------+-------+-------+-------+-------+---------+-------+-------+



In [16]:
null_count = yr_2018_df.select([(count(when(col(c).isNull(), c)).alias(c)) for c in yr_2018_df.columns])
null_count.show()

+----+----+----+---+-----+---------+------+-----+
|date|open|high|low|close|adj_close|Volume|stock|
+----+----+----+---+-----+---------+------+-----+
|   0|  44|  44| 44|   44|       44|    44|    0|
+----+----+----+---+-----+---------+------+-----+



In [17]:
#TODO: Write your code here
missing_count = {}  # Dictionary to keep track of the results
for column in yr_2018_df.columns:   # loop through each column
    _count = yr_2018_df.where(yr_2018_df[column].isNull()).count()  # null count in column x
    _total_count = yr_2018_df.select(yr_2018_df[column]).count()    # total count of column x 
    print(f'There are {_count} ({round(_count/_total_count*100, 3)}%) null values in {column} column')
    missing_count[f'{column}'] = _count # recording results in missing_count dictionary

There are 0 (0.0%) null values in date column
There are 44 (0.003%) null values in open column
There are 44 (0.003%) null values in high column
There are 44 (0.003%) null values in low column
There are 44 (0.003%) null values in close column
There are 44 (0.003%) null values in adj_close column
There are 44 (0.003%) null values in Volume column
There are 0 (0.0%) null values in stock column


In [18]:
#INSERTED (Show missing values based on stock)
for column in ['open', 'high', 'low', 'close', 'adj_close', 'volume']:
    print(f'Missing values in {column} based on stock')
    yr_2018_df.select('stock', column).filter(yr_2018_df[column].isNull()).groupBy('stock').count().show()

Missing values in open based on stock
+-----+-----+
|stock|count|
+-----+-----+
| GOGL|    1|
|  PCB|    2|
|  HAL|    1|
| TIVO|    1|
|  NEM|    1|
| TFII|    4|
|  VSH|    1|
| NLOK|    4|
| BRBS|    1|
|  FNV|    1|
| PIPR|    4|
| PTEN|    1|
|  CAH|    1|
|  MTA|   10|
|  MOS|    1|
| ODFL|    1|
|ZIONO|    3|
|   BE|    1|
| WTRG|    4|
| LBRT|    1|
+-----+-----+

Missing values in high based on stock
+-----+-----+
|stock|count|
+-----+-----+
| GOGL|    1|
|  PCB|    2|
|  HAL|    1|
| TIVO|    1|
|  NEM|    1|
| TFII|    4|
|  VSH|    1|
| NLOK|    4|
| BRBS|    1|
|  FNV|    1|
| PIPR|    4|
| PTEN|    1|
|  CAH|    1|
|  MTA|   10|
|  MOS|    1|
| ODFL|    1|
|ZIONO|    3|
|   BE|    1|
| WTRG|    4|
| LBRT|    1|
+-----+-----+

Missing values in low based on stock
+-----+-----+
|stock|count|
+-----+-----+
| GOGL|    1|
|  PCB|    2|
|  HAL|    1|
| TIVO|    1|
|  NEM|    1|
| TFII|    4|
|  VSH|    1|
| NLOK|    4|
| BRBS|    1|
|  FNV|    1|
| PIPR|    4|
| PTEN|    1|
|  

In [19]:
#TODO: Write your code here
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Null value Check")

checkResult = VerificationSuite(spark) \
    .onData(yr_2018_df) \
    .addCheck(  
     check.isComplete("date")\
    .isComplete("open")\
    .isComplete("high")\
    .isComplete("low")\
    .isComplete("close")\
    .isComplete("adj_close")\
    .isComplete("volume")\
    .isComplete("stock"))\
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+----------------+-----------+------------+----------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check           |check_level|check_status|constraint                                          |constraint_status|constraint_message                                                 |
+----------------+-----------+------------+----------------------------------------------------+-----------------+-------------------------------------------------------------------+
+----------------+-----------+------------+----------------------------------------------------+-----------------+-------------------------------------------------------------------+



## Test 2 - Zero Values 🅾️

For the second test, you are required to check for zero values within the dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for zero values within the data. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [20]:
yr_2018_df_stat = yr_2018_df.select('open', 'high', 'low', 'close', 'adj_close', 'volume')
yr_2018_df_stat.show(5)

+-----+-----+-----+-----+---------+---------+
| open| high|  low|close|adj_close|   volume|
+-----+-----+-----+-----+---------+---------+
|68.45|69.03|67.39|67.99| 67.07753|2112000.0|
|28.12|28.66|28.09|28.24|    28.24|2107800.0|
|32.47|33.65|32.47|33.54| 33.00629|9117600.0|
|41.28| 45.0|41.28|41.58|    41.58|   2100.0|
|  2.4| 2.42| 2.33| 2.42|2.4007938|   7100.0|
+-----+-----+-----+-----+---------+---------+
only showing top 5 rows



In [21]:
#TODO: Write your code here
df_zero_2018 = yr_2018_df_stat.select([F.count(F.when(yr_2018_df_stat[c] == 0, c)).alias(c) for c in yr_2018_df_stat.columns])

for column in yr_2018_df_stat.columns:
    print(f'There are {df_zero_2018.select(column).collect()[0][0]} ( {df_zero_2018.select(column).collect()[0][0]/yr_2018_df_stat.select(column).count()*100}%) zero values in {column} column')
    

There are 0 ( 0.0%) zero values in open column
There are 1 ( 7.6268226199356e-05%) zero values in high column
There are 1 ( 7.6268226199356e-05%) zero values in low column
There are 0 ( 0.0%) zero values in close column
There are 0 ( 0.0%) zero values in adj_close column
There are 36010 ( 2.746418825438809%) zero values in volume column


In [22]:
#INSERTED (Show zero values based on stock)
for column in ['open', 'high', 'low', 'close', 'adj_close', 'volume']:
    print(f'Zero values in {column} per stock')
    yr_2018_df.select('stock', column).filter(yr_2018_df[column]==0).groupBy('stock').count().show()

Zero values in open per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

Zero values in high per stock
+-----+-----+
|stock|count|
+-----+-----+
| KERN|    1|
+-----+-----+

Zero values in low per stock
+-----+-----+
|stock|count|
+-----+-----+
| KERN|    1|
+-----+-----+

Zero values in close per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

Zero values in adj_close per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

Zero values in volume per stock
+-----+-----+
|stock|count|
+-----+-----+
| BROG|   61|
|  JJS|  145|
|  KLR|   96|
|  FMY|    3|
|  VAM|   60|
| BYFC|   14|
| EYEN|    2|
| OPRX|    6|
| OGCP|   64|
| RNDB|   29|
|  LGL|    7|
| PBBI|   14|
|  ARL|    1|
| ARTW|   13|
|  LND|   26|
|  PFG|    1|
|GFNCP|   57|
| VCNX|    1|
| AVNW|    2|
|  LMB|    1|
+-----+-----+
only showing top 20 rows



In [23]:
#TODO: Write your code here

check = Check(spark, CheckLevel.Warning, "Zero value Check")
result = VerificationSuite(spark)\
.onData(yr_2018_df)\
.addCheck(check
    .satisfies("open == 0", "Zero value check", lambda x: x==0)\
    .satisfies("high == 0", "Zero value check", lambda x: x==0)\
    .satisfies("low == 0", "Zero value check", lambda x: x==0)\
    .satisfies("close == 0", "Zero value check", lambda x: x==0)\
    .satisfies("adj_close == 0", "Zero value check", lambda x: x==0)\
    .satisfies("volume == 0", "Zero value check", lambda x: x==0)\
    )\
.run()

result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)

Python Callback server started!
+----------------+-----------+------------+----------------------------------------------------------------------+-----------------+--------------------------------------------------------------------+
|check           |check_level|check_status|constraint                                                            |constraint_status|constraint_message                                                  |
+----------------+-----------+------------+----------------------------------------------------------------------+-----------------+--------------------------------------------------------------------+
+----------------+-----------+------------+----------------------------------------------------------------------+-----------------+--------------------------------------------------------------------+



## Test 3 - Negative values ➖️
The third test requires you to check that all values in the data are positive.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check negative values within the dataset. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [24]:
#INSERTED (Show negative values based on stock)
cols = ('open', 'high', 'low', 'close', 'adj_close', 'volume')
for i in cols:
    print(f'checking for negative values in {i} per stock')
    yr_2018_df.filter(F.col(i)<0).groupBy('stock').count().show()

checking for negative values in open per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

checking for negative values in high per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

checking for negative values in low per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

checking for negative values in close per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

checking for negative values in adj_close per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+

checking for negative values in volume per stock
+-----+-----+
|stock|count|
+-----+-----+
+-----+-----+



In [25]:
checkResult = VerificationSuite(spark) \
                    .onData(yr_2018_df) \
                    .addCheck(
                        Check(spark, CheckLevel.Warning, "Negative Values")\
                            .isNonNegative('open')\
                            .isNonNegative('high')\
                            .isNonNegative('low')\
                            .isNonNegative('close')\
                            .isNonNegative('adj_close')\
                            .isNonNegative('Volume')\
                            .isNonNegative('stock')\
                            )\
                            .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+---------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|check          |check_level|check_status|constraint                                                                                                            |constraint_status|constraint_message|
+---------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
+---------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------+-----------------+------------------+



## Test 4 - Determine Maximum Values ⚠️

For the fourth test, we want to find the maximum values in the dataset for the numerical fields. Extremum values can often be used to define an upper bound for the column values so we can define them as the threshold values. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Column Profiler Runner` to generate summary statistics for all the available columns. 
>2. Extract the maximum values for all the numeric columns in the data.
>
> *You may use as many cells as necessary*

In [26]:
#INSERTED (Find max stock values  per column)
cols = ('open', 'high', 'low', 'close', 'adj_close', 'volume')
for i in cols:
    yr_2018_df.agg(F.max(i)).show()

+---------+
|max(open)|
+---------+
| 117187.5|
+---------+

+---------+
|max(high)|
+---------+
| 125000.0|
+---------+

+--------+
|max(low)|
+--------+
|109375.0|
+--------+

+----------+
|max(close)|
+----------+
|  109375.0|
+----------+

+--------------+
|max(adj_close)|
+--------------+
|      109375.0|
+--------------+

+------------+
| max(volume)|
+------------+
|3.58775712E8|
+------------+



In [27]:
#TODO: Write your code here
from pydeequ.profiles import *
result = ColumnProfilerRunner(spark) \
    .onData(yr_2018_df.select('open','high','low','close','adj_close','volume')) \
    .run()
for col, profile in result.profiles.items():
    print(f'Statistics of \'{col}\':')
    print('\t', f"Minimum value for {col} column is "+ str(profile.minimum))
    print('\t', f"Maximum value for {col} column is "+ str(profile.maximum))

Statistics of 'open':
	 Minimum value for open column is 0.003000000026077032
	 Maximum value for open column is 117187.5
Statistics of 'low':
	 Minimum value for low column is 0.0
	 Maximum value for low column is 109375.0
Statistics of 'close':
	 Minimum value for close column is 0.003000000026077032
	 Maximum value for close column is 109375.0
Statistics of 'volume':
	 Minimum value for volume column is 0.0
	 Maximum value for volume column is 358775712.0
Statistics of 'adj_close':
	 Minimum value for adj_close column is 0.003000000026077032
	 Maximum value for adj_close column is 109375.0
Statistics of 'high':
	 Minimum value for high column is 0.0
	 Maximum value for high column is 125000.0


## Test 5 - Stock Tickers 💹️

For the fifth test, we want to determine if the stock tickers contained in our dataset are consistent. To do this, you will need to make use of use of the metadata file to check that the stock names used in the dataframe are valid. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine if the stock tickers contained in the dataset appear in the metadata file.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [28]:
#TODO: Write your code here
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Stock ticker consistency Check")

checkResult = VerificationSuite(spark) \
    .onData(yr_2018_df) \
    .addCheck(  
     check.isContainedIn("stock",np.array(meta.select('Symbol').collect()).reshape(-1))\
    )\
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+------------------------------+-----------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Test 6 - Duplication 👥️
Lastly, we want to determine the uniqueness of the items found in the dataframe. You need to make use of the Verification Suite to check for the validity of the stock tickers. 

Similar to the previous notebook - `Data_profiling_student_version.ipynb`, the first thing to check will be if the primary key values within the dataset are unique - in our case, that will be a combination of the stock name and the date. Secondly, we want to check if the entries are all unique, which is done by checking for duplicates across that whole dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine the uniqueness of entries contained within the dataset.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*



In [29]:
yr_2018_df.groupby(['date', 'stock']) \
  .count() \
  .where('count > 1') \
  .sort('count', ascending=False) \
  .show()

+----+-----+-----+
|date|stock|count|
+----+-----+-----+
+----+-----+-----+



In [30]:
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Duplication test Check")
checkResult = VerificationSuite(spark).onData(yr_2018_df).addCheck(check\
    .hasUniqueness(["date","stock"], lambda x:x ==1)\
    .hasUniqueness(['open','high','low','close','adj_close','volume'], lambda x:x ==1)).run()
                                                           
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+----------------------+-----------+------------+------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check                 |check_level|check_status|constraint                                            |constraint_status|constraint_message                                                 |
+----------------------+-----------+------------+------------------------------------------------------+-----------------+-------------------------------------------------------------------+
+----------------------+-----------+------------+------------------------------------------------------+-----------------+-------------------------------------------------------------------+

