In [1]:
%env SPARK_VERSION=3.0.0

env: SPARK_VERSION=3.0.0


In [2]:
import pydeequ
from pyspark.sql import SparkSession, Row


spark = (
    SparkSession.builder
    .appName("pydeequ")
    .config("spark.jars", "/home/jovyan/mssql-jdbc-12.4.0.jre11.jar, /home/jovyan/deequ-2.0.3-spark-3.3.jar")
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate()
)


In [3]:
url = "jdbc:sqlserver://host.docker.internal:1433;databaseName=AdventureWorks2012;user=testlogin;password=testPa$$24;encrypt=true;trustServerCertificate=true" 

table = "Production.Product"
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

In [4]:
# connect to DB with Spark using JDBC connection to read the data
products_df = (
    spark.read
    .format("jdbc")
    .option("driver",driver)
    .option("url", url)
    .option("dbtable", table)
    .load()
)

products_df.show()

+---------+--------------------+-------------+--------+-----------------+------+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+--------------------+--------------+-------------------+-----------+----------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|MakeFlag|FinishedGoodsFlag| Color|SafetyStockLevel|ReorderPoint|StandardCost|ListPrice|Size|SizeUnitMeasureCode|WeightUnitMeasureCode|Weight|DaysToManufacture|ProductLine|Class|Style|ProductSubcategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|             rowguid|        ModifiedDate|
+---------+--------------------+-------------+--------+-----------------+------+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+--------------------+--------------+-------------------+-

### Data Analyzers section

In [6]:
from pydeequ.analyzers import *


analysisResult = (
    AnalysisRunner(spark)
    .onData(products_df)
    .addAnalyzer(Size())
    .addAnalyzer(Completeness("Name"))
    .addAnalyzer(Mean("StandardCost"))
    .addAnalyzer(Maximum("Weight"))
    .addAnalyzer(Minimum("Weight"))
    .addAnalyzer(CountDistinct("Color"))
    .addAnalyzer(Compliance("Color_specified", "Color IS NOT NULL"))
    .run()
)

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+---------------+-------------+------------------+
| entity|       instance|         name|             value|
+-------+---------------+-------------+------------------+
| Column|Color_specified|   Compliance|0.5079365079365079|
| Column|         Weight|      Maximum|            1050.0|
| Column|         Weight|      Minimum|              2.12|
| Column|   StandardCost|         Mean|258.60296130952383|
| Column|           Name| Completeness|               1.0|
| Column|          Color|CountDistinct|               9.0|
|Dataset|              *|         Size|             504.0|
+-------+---------------+-------------+------------------+



### Data profiling section

##### Data profiling and Constraint suggestion sections were done using pydeequ documentation: https://pypi.org/project/pydeequ/1.0.1/. 
##### Nevertheless these blocks exited with errors related to incompatibility of some modules.

In [7]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(products_df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

Py4JJavaError: An error occurred while calling o96.run.
: com.amazon.deequ.analyzers.runners.MetricCalculationRuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 15) (e1224de42efc executor driver): java.lang.NoSuchMethodError: 'org.apache.spark.sql.types.StructType org.apache.spark.sql.types.StructType$.fromAttributes(scala.collection.Seq)'
	at org.apache.spark.sql.catalyst.expressions.aggregate.StatefulHyperloglogPlus.aggBufferSchema(StatefulHyperloglogPlus.scala:62)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$initializeAggregateFunctions$1(AggregationIterator.scala:110)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$initializeAggregateFunctions$1$adapted(AggregationIterator.scala:81)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.initializeAggregateFunctions(AggregationIterator.scala:81)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:118)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:106)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:123)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:97)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at com.amazon.deequ.analyzers.runners.MetricCalculationException$.wrapIfNecessary(MetricCalculationException.scala:74)
	at com.amazon.deequ.analyzers.Analyzers$.metricFromFailure(Analyzer.scala:508)
	at com.amazon.deequ.analyzers.StandardScanShareableAnalyzer.toFailureMetric(Analyzer.scala:221)
	at com.amazon.deequ.analyzers.StandardScanShareableAnalyzer.toFailureMetric(Analyzer.scala:201)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$6(AnalysisRunner.scala:335)
	at scala.collection.immutable.List.map(List.scala:297)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:335)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
	at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
	at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
	at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
	at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
	at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 15) (e1224de42efc executor driver): java.lang.NoSuchMethodError: 'org.apache.spark.sql.types.StructType org.apache.spark.sql.types.StructType$.fromAttributes(scala.collection.Seq)'
	at org.apache.spark.sql.catalyst.expressions.aggregate.StatefulHyperloglogPlus.aggBufferSchema(StatefulHyperloglogPlus.scala:62)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$initializeAggregateFunctions$1(AggregationIterator.scala:110)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$initializeAggregateFunctions$1$adapted(AggregationIterator.scala:81)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.initializeAggregateFunctions(AggregationIterator.scala:81)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:118)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:106)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:123)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:97)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.NoSuchMethodError: 'org.apache.spark.sql.types.StructType org.apache.spark.sql.types.StructType$.fromAttributes(scala.collection.Seq)'
	at org.apache.spark.sql.catalyst.expressions.aggregate.StatefulHyperloglogPlus.aggBufferSchema(StatefulHyperloglogPlus.scala:62)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$initializeAggregateFunctions$1(AggregationIterator.scala:110)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$initializeAggregateFunctions$1$adapted(AggregationIterator.scala:81)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.initializeAggregateFunctions(AggregationIterator.scala:81)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:118)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:106)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:123)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:97)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)


### Constraint Suggestions section

In [8]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(products_df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(suggestionResult)

Py4JError: An error occurred while calling None.com.amazon.deequ.suggestions.rules.CategoricalRangeRule. Trace:
py4j.Py4JException: Constructor com.amazon.deequ.suggestions.rules.CategoricalRangeRule([]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:180)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:197)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)



### Constraint Verification section

In [9]:
from pydeequ.checks import *
from pydeequ.verification import *


check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = (
    VerificationSuite(spark)
    .onData(products_df)
    .addCheck(
        check.isUnique("ProductID")
        .isNonNegative("ProductID")
        .isComplete("ReorderPoint")
        .isNonNegative("ReorderPoint")
        .hasCompleteness("ProductModelID", lambda x: x >= 0.54, "It should be above 0.54!")
        .hasMin("StandardCost", lambda x: x == 0)
        .isNonNegative("Weight")
        .isUnique("ProductID")
        .isContainedIn("Style", ["U ", "W ", "M "])
        .isContainedIn("Color", ["Black", "Silver", "Red", "Yellow", "Blue", "Multi", "Silver/Black", "White", "Grey", "Brown"])
        .isComplete("ModifiedDate")
    )
    .run()
)

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

Python Callback server started!
<pydeequ.checks.Check object at 0x7fb5b54d6450>
+------------+-----------+------------+--------------------+-----------------+------------------+
|       check|check_level|check_status|          constraint|constraint_status|constraint_message|
+------------+-----------+------------+--------------------+-----------------+------------------+
+------------+-----------+------------+--------------------+-----------------+------------------+



##### Report

In [10]:

from IPython.display import display, HTML

checkResult_df = checkResult_df.toPandas()

html_report = checkResult_df.to_html()
display(HTML(html_report))

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Success,"UniquenessConstraint(Uniqueness(List(ProductID),None))",Success,
1,Review Check,Warning,Success,"ComplianceConstraint(Compliance(ProductID is non-negative,COALESCE(CAST(ProductID AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
2,Review Check,Warning,Success,"CompletenessConstraint(Completeness(ReorderPoint,None))",Success,
3,Review Check,Warning,Success,"ComplianceConstraint(Compliance(ReorderPoint is non-negative,COALESCE(CAST(ReorderPoint AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
4,Review Check,Warning,Success,"CompletenessConstraint(Completeness(ProductModelID,None))",Success,
5,Review Check,Warning,Success,"MinimumConstraint(Minimum(StandardCost,None))",Success,
6,Review Check,Warning,Success,"ComplianceConstraint(Compliance(Weight is non-negative,COALESCE(CAST(Weight AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
7,Review Check,Warning,Success,"UniquenessConstraint(Uniqueness(List(ProductID),None))",Success,
8,Review Check,Warning,Success,"ComplianceConstraint(Compliance(Style contained in U ,W ,M ,`Style` IS NULL OR `Style` IN ('U ','W ','M '),None))",Success,
9,Review Check,Warning,Success,"ComplianceConstraint(Compliance(Color contained in Black,Silver,Red,Yellow,Blue,Multi,Silver/Black,White,Grey,Brown,`Color` IS NULL OR `Color` IN ('Black','Silver','Red','Yellow','Blue','Multi','Silver/Black','White','Grey','Brown'),None))",Success,


In [12]:
import os

with open("constraint_verification_report.html", "w") as tr:
    tr.write(html_report)