# Project: DEEQU for DATA QUALITY

From an external source system you are supplied with data about available shoe products.
Using the capabilities of pydeequ, develop a data analysis task.

**Dataset Structure**

| Name | Description |
| -- | -- |
| id | Unique object identifier |
| vendor_code | Unique position (product) identifier |
| name | Model name |
| type | Type of model |
| label | Brand |
| price | Price ($) | 
| discount | Discount in percents [0-100] |
| available_count | Quantity available in stock |
| preorder_count | Quantity of preorders |

**Analyze the following conditions:**

1. The size of the dataset
2. Completeness of all columns (x9)
3. Uniqueness of the id
4. Are there any records with a discount less than 0
5. Records with a discount of more than 100 are present
6. Are records present with available quantity in stock less than 0
7. Are there records with a quantity of pre-orders less than 0

The report must contain a total of 15 conditions.


In [1]:
import os
os.environ['SPARK_VERSION'] = '3.1'

In [2]:
import pydeequ
from pydeequ.profiles import *
from pydeequ.checks import *
from pydeequ.verification import *
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

Deequ is still not supported in spark version: 3.1


In [3]:
spark = SparkSession\
    .builder\
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)\
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)\
    .getOrCreate()

In [4]:
df = spark.read.parquet("data.parquet")

In [5]:
df.show(10)

+--------------------+--------------------+------------------+----------------+----------+------+--------+---------------+--------------+
|                  id|         vendor_code|              name|            type|     label| price|discount|available_count|preorder_count|
+--------------------+--------------------+------------------+----------------+----------+------+--------+---------------+--------------+
|0000a2a47fdf45438...|695aae461072410ab...|   Timberland Perf| Casual Sneakers|Timberland|201.99|    10.0|             45|           114|
|000b82e2782f44749...|c2b987df97fb4160a...|     Vans High-Top|Basketball shoes|      Vans|558.99|    10.0|             62|           106|
|0010c9bde5454bddb...|be0bf295383a41098...|           CAT 327| Casual Sneakers|       CAT|535.99|    30.0|            139|           148|
|00144c94d03c44538...|4a6b746de6714c99b...|          Nike SS1| Casual Sneakers|      Nike|470.99|     0.0|            109|            36|
|00148575ec5f42a1b...|39416c08b65f

In [6]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_code: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- label: string (nullable = true)
 |-- price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- available_count: integer (nullable = true)
 |-- preorder_count: integer (nullable = true)



## Analyzer

In [7]:
from pydeequ.analyzers import *

analyzer = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("id")) \
                    .addAnalyzer(Completeness("vendor_code")) \
                    .addAnalyzer(Completeness("name")) \
                    .addAnalyzer(Completeness("type")) \
                    .addAnalyzer(Completeness("label")) \
                    .addAnalyzer(Completeness("price")) \
                    .addAnalyzer(Completeness("discount")) \
                    .addAnalyzer(Completeness("available_count")) \
                    .addAnalyzer(Completeness("preorder_count")) \
                    .addAnalyzer(Compliance("discount less than 0", 'discount<0')) \
                    .addAnalyzer(Compliance("discount great than 100", 'discount>100')) \
                    .addAnalyzer(Compliance("available quantity in stock less than 0", 'available_count<0')) \
                    .addAnalyzer(Compliance("quantity of pre-orders less than 0", 'preorder_count<0')) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analyzer)
analysisResult_df.show()

+-------+--------------------+------------+--------+
| entity|            instance|        name|   value|
+-------+--------------------+------------+--------+
| Column|discount great th...|  Compliance|     0.0|
| Column|         vendor_code|Completeness|     0.9|
| Column|available quantit...|  Compliance|   0.001|
| Column|               label|Completeness|     1.0|
| Column|discount less than 0|  Compliance|  3.5E-4|
| Column|     available_count|Completeness|     1.0|
| Column|            discount|Completeness|     1.0|
| Column|                type|Completeness|    0.95|
| Column|      preorder_count|Completeness|     1.0|
| Column|               price|Completeness|    0.97|
|Dataset|                   *|        Size|100000.0|
| Column|                  id|Completeness|     1.0|
| Column|                name|Completeness|     1.0|
| Column|quantity of pre-o...|  Compliance|     0.0|
+-------+--------------------+------------+--------+



## Constraint Verification

In [8]:
check_discount = Check(spark, CheckLevel.Warning, "Discount Check")\
        .hasMax("discount", lambda x: x <= 100.0)  \
        .isNonNegative("discount") \
        .isComplete("discount")
check_available_count = Check(spark, CheckLevel.Warning, "Available Count Check")\
        .isNonNegative("available_count") \
        .isComplete("available_count") 
check_preorder_count = Check(spark, CheckLevel.Warning, "Preorder Count Check")\
        .isNonNegative("preorder_count")
check_id = Check(spark, CheckLevel.Error, "Id Unickness Check")\
        .isUnique("id")
check_dataset = Check(spark, CheckLevel.Error, "Dataset Check")\
        .hasSize(lambda x: x >= 20000)

Python Callback server started!


In [9]:
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(check_discount) \
    .addCheck(check_available_count) \
    .addCheck(check_preorder_count) \
    .addCheck(check_id) \
    .addCheck(check_dataset) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.toPandas().head(20)

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Preorder Count Check,Warning,Success,ComplianceConstraint(Compliance(preorder_count...,Success,
1,Dataset Check,Error,Success,SizeConstraint(Size(None)),Success,
2,Discount Check,Warning,Warning,"MaximumConstraint(Maximum(discount,None))",Success,
3,Discount Check,Warning,Warning,ComplianceConstraint(Compliance(discount is no...,Failure,Value: 0.99965 does not meet the constraint re...
4,Discount Check,Warning,Warning,"CompletenessConstraint(Completeness(discount,N...",Success,
5,Available Count Check,Warning,Warning,ComplianceConstraint(Compliance(available_coun...,Failure,Value: 0.999 does not meet the constraint requ...
6,Available Count Check,Warning,Warning,CompletenessConstraint(Completeness(available_...,Success,
7,Id Unickness Check,Error,Success,"UniquenessConstraint(Uniqueness(List(id),None))",Success,


In [10]:
VerificationResult.successMetricsAsDataFrame(spark, checkResult).toPandas().head(20)

Unnamed: 0,entity,instance,name,value
0,Column,available_count is non-negative,Compliance,0.999
1,Column,available_count,Completeness,1.0
2,Column,discount,Maximum,40.0
3,Column,discount,Completeness,1.0
4,Column,discount is non-negative,Compliance,0.99965
5,Column,preorder_count is non-negative,Compliance,1.0
6,Dataset,*,Size,100000.0
7,Column,id,Uniqueness,1.0


## Conclusion

Based on the findings, we found out that:

1. The size of the dataset is 100000
2. Completeness of all columns (x9):  `vendor_code`, `type`, `price` have some missing data
3. The column `id` has unique values
4. There are some records with a discount less than 0
5. There are no records with a discount of more than 100 are present
6. There are some records with available quantity in stock less than 0
7. There are no records with a quantity of pre-orders less than 0


**Points of future improvement**:
1. Either fill in the missing data or get rid of the record in case it doesn't reflect the result
2. Change the value of the outliers to the avarage value