In [1]:
%env SPARK_VERSION=3.2.0

env: SPARK_VERSION=3.2.0


In [2]:
import pydeequ

from pyspark.sql import SparkSession, Row

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("spark.driver.extraClassPath","/home/jovyan/mssql-jdbc-12.6.1.jre11.jar")
    .config("spark.executor.extraClassPath", "/home/jovyan/mssql-jdbc-12.6.1.jre11.jar")
    .getOrCreate())

spark

In [3]:
server_name = "jdbc:sqlserver://host.docker.internal;encrypt=true;trustServerCertificate=true"
database_name = "AdventureWorks2012"
url = server_name + ";" + "databaseName=" + database_name + ";"

table = "HumanResources.Employee"
user = "shell84"
password  = "zQ692U3d!"

df = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .load()

df.show()

+----------------+----------------+--------------------+----------------+-----------------+--------------------+----------+-------------+------+----------+------------+-------------+--------------+-----------+--------------------+-------------------+
|BusinessEntityID|NationalIDNumber|             LoginID|OrganizationNode|OrganizationLevel|            JobTitle| BirthDate|MaritalStatus|Gender|  HireDate|SalariedFlag|VacationHours|SickLeaveHours|CurrentFlag|             rowguid|       ModifiedDate|
+----------------+----------------+--------------------+----------------+-----------------+--------------------+----------+-------------+------+----------+------------+-------------+--------------+-----------+--------------------+-------------------+
|               1|       295847284|adventure-works\ken0|            null|             null|Chief Executive O...|1969-01-29|            S|     M|2009-01-14|        true|           99|            69|       true|F01251E5-96A3-448...|2014-06-30 00:00:

In [4]:
### Data Analyzers section
from pydeequ.analyzers import *

# Define the DataFrame Analyzer
analysis_result = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("BusinessEntityID")) \
    .addAnalyzer(Completeness("NationalIDNumber")) \
    .addAnalyzer(Completeness("email")) \
    .addAnalyzer(Completeness("LoginID")) \
    .addAnalyzer(Completeness("OrganizationNode")) \
    .addAnalyzer(Completeness("rowguid")) \
    .run()

# Display the analysis results
analysis_result_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysis_result)
analysis_result_df.show()

+-------+----------------+------------+-----------------+
| entity|        instance|        name|            value|
+-------+----------------+------------+-----------------+
| Column|         LoginID|Completeness|              1.0|
| Column|NationalIDNumber|Completeness|              1.0|
| Column|OrganizationNode|Completeness|0.996551724137931|
|Dataset|               *|        Size|            290.0|
| Column|         rowguid|Completeness|              1.0|
| Column|BusinessEntityID|Completeness|              1.0|
+-------+----------------+------------+-----------------+





In [5]:
### Data profiling section
from pydeequ.profiles import *

# Define the DataFrame ColumnProfiler
result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: JobTitle: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 66,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 290
    },
    "histogram": [
        [
            "Production Technician - WC60",
            26,
            0.0896551724137931
        ],
        [
            "Sales Representative",
            14,
            0.04827586206896552
        ],
        [
            "Research and Development Manager",
            2,
            0.006896551724137931
        ],
        [
            "Research and Development Engineer",
            2,
            0.006896551724137931
        ],
        [
            "Quality Assurance Manager",
            1,
            0.0034482758620689655
        ],
        [
            "Production Supervisor - WC60",
            3,
            0.010344827586206896
        ],

In [6]:
### Constraint Suggestions section
from pydeequ.suggestions import *

# Run constraint suggestion runner on the DataFrame
suggestion_result = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()
# Show contraint suggestions
suggestion_result

{'constraint_suggestions': [{'constraint_name': 'CompletenessConstraint(Completeness(JobTitle,None))',
   'column_name': 'JobTitle',
   'current_value': 'Completeness: 1.0',
   'description': "'JobTitle' is not null",
   'suggesting_rule': 'CompleteIfCompleteRule()',
   'rule_description': 'If a column is complete in the sample, we suggest a NOT NULL constraint',
   'code_for_constraint': '.isComplete("JobTitle")'},
  {'constraint_name': "ComplianceConstraint(Compliance('SickLeaveHours' has value range '64', '52', '63', '41', '30', '38', '20', '42', '53', '58', '69', '31', '47', '60', '39', '49', '57', '65', '43', '54', '21', '48', '59', '24', '46', '61', '68', '37', '50', '55', '56', '22', '33', '44', '40', '62', '23', '51', '66', '67', '45', '27', '25', '36', '32', '35', '34', '28', '29', '26', '80',`SickLeaveHours` IN ('64', '52', '63', '41', '30', '38', '20', '42', '53', '58', '69', '31', '47', '60', '39', '49', '57', '65', '43', '54', '21', '48', '59', '24', '46', '61', '68', '37'

In [9]:
### Constraint Verification section
from pydeequ.checks import *
from pydeequ.verification import *
import pandas as pd

# Define a check for the DataFrame
check = Check(spark, CheckLevel.Warning, "HumanResources.Employee")

# Run verification suite on the DataFrame
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x > 1) 
        .isComplete("BusinessEntityID")
        .isComplete("NationalIDNumber")
        .isComplete("LoginID")
        .isComplete("BirthDate")
        .isComplete("HireDate")
        .isComplete("rowguid")
        .isUnique("BusinessEntityID") 
        .isNonNegative("BusinessEntityID")
        .isNonNegative("NationalIDNumber")
        .isUnique("NationalIDNumber")
        .isUnique("LoginID")        
        .isNonNegative("rowguid")        
        .isComplete("JobTitle")
        .isComplete("OrganizationLevel")
    ) \
    .run()

# Print verification run status
print(f"Verification Run Status: {checkResult.status}")

# Convert PySpark DataFrame to Pandas DataFrame
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()


# Export the DataFrame to test_report.html
constraint_df = checkResult_df.toPandas()
constraint_df.to_html('test_report.html', index=False)

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



