
#### Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
###### Key packages for DQ unite testing:
###### pypi package: pydeequ==1.4.0
###### maven coordinate: com.amazon.deequ:deequ:2.0.8-spark-3.5
###### https://mvnrepository.com/artifact/com.amazon.deequ/deequ
###### https://central.sonatype.com/artifact/com.amazon.deequ/deequ/2.0.8-spark-3.5
###### https://github.com/awslabs/deequ
###### https://pydeequ.readthedocs.io/_/downloads/en/latest/pdf/

In [0]:
# set spark version variable

%env SPARK_VERSION=3.5

env: SPARK_VERSION=3.5


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import pydeequ
from pyspark.sql import SparkSession, DataFrame

spark_conf = (
    SparkSession
        .builder
        .config("spark.jars.packages", pydeequ.deequ_maven_coord)
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
        .getOrCreate()
)

In [0]:
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType, DateType

schema = StructType([
    StructField('DOLocationID', LongType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('RatecodeID', LongType(), True),
    StructField('VendorID', LongType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('passenger_count', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('pep_pickup_date_txt', DateType(), True)
])

source_path = "dbfs:/databricks-datasets/nyctaxi/sample/json/"

# Now you can use this schema to create a DataFrame
df = spark.read.format("json").schema(schema).load(source_path)
display(df)

DOLocationID,PULocationID,RatecodeID,VendorID,congestion_surcharge,extra,fare_amount,improvement_surcharge,mta_tax,passenger_count,payment_type,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance,pep_pickup_date_txt
236,132,2,2,2.5,0.0,52.0,0.3,0.5,1,1,N,9.0,6.12,70.42,2019-12-19 00:33:53,2019-12-19 00:02:14,19.32,2019-12-19
75,74,1,2,0.0,0.5,4.5,0.3,0.5,1,2,N,0.0,0.0,5.8,2019-12-19 00:03:20,2019-12-19 00:00:03,0.74,2019-12-19
243,234,1,2,2.5,0.5,39.5,0.3,0.5,2,1,N,8.0,0.0,51.3,2019-12-19 00:38:42,2019-12-19 00:00:48,11.26,2019-12-19
144,48,1,2,2.5,0.5,13.0,0.3,0.5,2,1,N,2.52,0.0,19.32,2019-12-19 00:15:53,2019-12-19 00:01:04,3.54,2019-12-19
236,238,1,2,2.5,0.5,9.0,0.3,0.5,2,1,N,2.56,0.0,15.36,2019-12-19 00:11:11,2019-12-19 00:01:27,1.7,2019-12-19
90,231,1,2,2.5,0.5,8.0,0.3,0.5,1,1,N,2.36,0.0,14.16,2019-12-19 00:10:39,2019-12-19 00:03:07,1.79,2019-12-19
201,79,1,2,2.5,0.5,51.5,0.3,0.5,2,2,N,0.0,2.29,57.59,2019-12-19 00:52:44,2019-12-19 00:00:36,16.11,2019-12-19
239,140,1,2,2.5,0.5,9.0,0.3,0.5,2,2,N,0.0,0.0,12.8,2019-12-19 00:10:08,2019-12-19 00:01:07,2.07,2019-12-19
79,141,1,2,2.5,0.5,11.0,0.3,0.5,2,2,N,0.0,0.0,14.8,2019-12-19 00:09:59,2019-12-19 00:00:18,3.3,2019-12-19
43,230,1,2,2.5,0.5,11.5,0.3,0.5,1,1,N,4.59,0.0,19.89,2019-12-19 00:15:12,2019-12-19 00:02:13,2.7,2019-12-19


In [0]:
df.printSchema()

root
 |-- DOLocationID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- VendorID: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pep_pickup_date_txt: date (nullable = true)



#### Before defining checks on the data, let's calculate some statistics for the dataset using the analysis runner.

 * This will give us some idea of what data quality checks to implement with the pydeequ VerificationSuite tests.

 * We will first analyze and understand the data, before initiating verification tests to ensure that the properties to be derived from the analysis runner will also hold for new versions of the dataset.

 * By defining assertions on the data distribution as part of a data pipeline, we can make sure every processed dataset is of high quality, and that any application consuming the data can rely on it.

In [0]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark_conf) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("DOLocationID")) \
                    .addAnalyzer(Distinctness("DOLocationID")) \
                    .addAnalyzer(Mean("total_amount")) \
                    .addAnalyzer(Compliance("passenger_count", "passenger_count >= 4")) \
                    .addAnalyzer(Correlation("tolls_amount", "total_amount")) \
                    .addAnalyzer(Correlation("passenger_count", "total_amount")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark_conf, analysisResult)
analysisResult_df.display()



entity,instance,name,value
Column,total_amount,Mean,19.641648841088863
Column,DOLocationID,Completeness,1.0
Column,passenger_count,Compliance,0.0808869429871045
Multicolumn,"tolls_amount,total_amount",Correlation,0.0667769406006882
Dataset,*,Size,6896317.0
Multicolumn,"passenger_count,total_amount",Correlation,0.0013093261647658
Column,DOLocationID,Distinctness,3.784628809841543e-05


#### Run a profiler to analyze every column in the dataframe.

In [0]:
from pydeequ.suggestions import *
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark_conf) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

NumericProfiles for column: DOLocationID: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 264,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null,
    "kll": "None",
    "mean": 161.4892224356856,
    "maximum": 265.0,
    "minimum": 1.0,
    "sum": 1113680870.0,
    "stdDev": 70.33752167970998,
    "approxPercentiles": []
}
NumericProfiles for column: improvement_surcharge: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "Fractional",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "0.0",
            4682,
            0.0006789131068075902
        ],
        [
            "-0.3",
            20811,
            0.0030176977073414694
        ],
        [
            "0.3",
            6870824,
            0.9963033891858509
        ]
    ],
    "kll": "None",
    "mean": 0.29798570744509534,
    "maximum": 0.3,
    "minimum": -0.3,
    "s

#### Run an automated contsraint suggestion to recommend specific helpful constraint rules for our data quality verification tests.

In [0]:
suggestionResult = (
    ConstraintSuggestionRunner(spark)
    .onData(df)
    .addConstraintRule(CompleteIfCompleteRule())
    .addConstraintRule(NonNegativeNumbersRule())
    .addConstraintRule(RetainTypeRule())
    .addConstraintRule(UniqueIfApproximatelyUniqueRule())
    .addConstraintRule(CategoricalRangeRule())
    #.addConstraintRule(FractionalCategoricalRangeRule())
    .run()
)

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(DOLocationID,None,None))",
      "column_name": "DOLocationID",
      "current_value": "Completeness: 1.0",
      "description": "'DOLocationID' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"DOLocationID\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('DOLocationID' has no negative values,DOLocationID >= 0,None,List(DOLocationID),None))",
      "column_name": "DOLocationID",
      "current_value": "Minimum: 1.0",
      "description": "'DOLocationID' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constraint": ".isNonNegative(\"DOLocationID

##### Define a Pydeequ copliance chaeck which ensures that the DOLocationID column contains only digits (0-9) and at least one digit is present.

In [0]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark_conf, CheckLevel.Warning, "Check DOLocationID")
checkResult = VerificationSuite(spark_conf) \
    .onData(df) \
    .addCheck(
        check.isNonNegative("DOLocationID")
    ) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark_conf, checkResult)
display(checkResult_df)



check,check_level,check_status,constraint,constraint_status,constraint_message
Check DOLocationID,Error,Success,"ComplianceConstraint(Compliance(DOLocationID is non-negative,COALESCE(CAST(DOLocationID AS DECIMAL(20,10)), 0.0) >= 0,None,List(DOLocationID),None))",Success,
