In [1]:
import sys
import numpy as np
import pandas as pd
import sklearn as skl

print("PATH: {}".format(os.environ['PATH']))
print("PYTHONPATH: {}".format(os.environ['PYTHONPATH']))
print("")
print("Spark: {}".format(spark.version))
print("Python: {}".format(sys.version))
spark.sparkContext

PATH: /opt/conda/bin:/usr/lib64/qt-3.3/bin:/opt/conda/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin
PYTHONPATH: /opt/conda/lib/python3.7/site-packages:/usr/hdp/current/spark2-client/python:/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip

Spark: 2.3.2.3.1.0.0-78
Python: 3.7.12 | packaged by conda-forge | (default, Oct 26 2021, 06:08:53) 
[GCC 9.4.0]


## Import PyDeequ and init PySpark DataFrame

In [2]:
from pyspark.sql import SparkSession, Row
import pydeequ

df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()
df.toPandas()

                                                                                

Unnamed: 0,a,b,c
0,foo,1,5.0
1,bar,2,6.0
2,baz,3,


## Example Analyzer

In [3]:
from pydeequ.analyzers import *

analyzer = AnalysisRunner(spark).onData(df).addAnalyzer(Size()).addAnalyzer(Completeness("b"))
analysisResult = analyzer.run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.toPandas()

Unnamed: 0,entity,instance,name,value
0,Dataset,*,Size,3.0
1,Column,b,Completeness,1.0


## Example Profile

In [4]:
from pydeequ.profiles import *

profiler = ColumnProfilerRunner(spark).onData(df)
result = profiler.run()

for col, profile in result.profiles.items():
    print(f"{col} => {profile}")

21/12/02 17:05:37 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
                                                                                

a => StandardProfiles for column: a: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 3
    },
    "histogram": [
        [
            "baz",
            1,
            0.3333333333333333
        ],
        [
            "foo",
            1,
            0.3333333333333333
        ],
        [
            "bar",
            1,
            0.3333333333333333
        ]
    ]
}
b => NumericProfiles for column: b: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "1",
            1,
            0.3333333333333333
        ],
        [
            "2",
            1,
            0.3333333333333333
        ],
        [
            "3",
            1,
 

## Example Constraint Suggestions

In [5]:
from pydeequ.suggestions import *

csrunner = ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT())
suggestionResult = csrunner.run()

# Constraint Suggestions in JSON format
print(suggestionResult['constraint_suggestions'])

[{'constraint_name': 'CompletenessConstraint(Completeness(b,None))', 'column_name': 'b', 'current_value': 'Completeness: 1.0', 'description': "'b' is not null", 'suggesting_rule': 'CompleteIfCompleteRule()', 'rule_description': 'If a column is complete in the sample, we suggest a NOT NULL constraint', 'code_for_constraint': '.isComplete("b")'}, {'constraint_name': "ComplianceConstraint(Compliance('b' has no negative values,b >= 0,None))", 'column_name': 'b', 'current_value': 'Minimum: 1.0', 'description': "'b' has no negative values", 'suggesting_rule': 'NonNegativeNumbersRule()', 'rule_description': 'If we see only non-negative numbers in a column, we suggest a corresponding constraint', 'code_for_constraint': '.isNonNegative("b")'}, {'constraint_name': 'UniquenessConstraint(Uniqueness(List(b),None))', 'column_name': 'b', 'current_value': 'ApproxDistinctness: 1.0', 'description': "'b' is unique", 'suggesting_rule': 'UniqueIfApproximatelyUniqueRule()', 'rule_description': 'If the ratio

## Example Constraint Verification

In [6]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkCond = check.hasSize(lambda x: x >= 3)\
    .hasMin("b", lambda x: x == 0)\
    .isUnique("a").isNonNegative("b").isComplete("c")\
    .isContainedIn("a", ["foo", "bar", "baz"])

verificator = VerificationSuite(spark).onData(df).addCheck(checkCond)
checkResult = verificator.run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.toPandas()

Python Callback server started!


Hive Session ID = 794bbe7c-6ab6-45aa-9165-3a84095ff496
                                                                                

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Warning,SizeConstraint(Size(None)),Success,
1,Review Check,Warning,Warning,"MinimumConstraint(Minimum(b,None))",Failure,Value: 1.0 does not meet the constraint requir...
2,Review Check,Warning,Warning,"UniquenessConstraint(Uniqueness(List(a),None))",Success,
3,Review Check,Warning,Warning,ComplianceConstraint(Compliance(b is non-negat...,Success,
4,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(c,None))",Failure,Value: 0.6666666666666666 does not meet the co...
5,Review Check,Warning,Warning,ComplianceConstraint(Compliance(a contained in...,Success,
