In [44]:
from typing import List
from pydeequ.analyzers import _AnalyzerObject
from pydeequ.scala_utils import to_scala_seq
from pyspark.sql import SparkSession, DataFrame


class IncreamentalAnalysis:
    def __init__(self, analyzer: List[_AnalyzerObject] = []):
        self.analyzer = analyzer

    def addAnalyzer(self, analyzer: _AnalyzerObject):
        return IncreamentalAnalysis(self.analyzer + [analyzer])

    def generate(self, spark: SparkSession):
        return list(
            map(lambda analyzer: analyzer._set_jvm(spark._jvm)._analyzer_jvm, self.analyzer)
        )


class IcreamentalRunner:
    def __init__(self, spark: SparkSession, analyzers: List):
        self.spark = spark
        self._jsparkSession = spark._jsparkSession
        self._jvm = self.spark._jvm
        if isinstance(analyzers,List):
            self.analysis = self._jvm.com.amazon.deequ.analyzers.Analysis(
                to_scala_seq(self._jvm, analyzers)
            )
        else:
            self.analysis = analyzers
        self.analysisRunner = (
            spark._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner
        )

    def run(self,df, state, firstTime: bool = True):
        #state = self._jvm.com.amazon.deequ.analyzers.HdfsStateProvider(
        #    self._jsparkSession, statePath, 10, False
        #)
        storageLevelOfGroupedDataForMultiplePasses = getattr(
            self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner, "run$default$5"
        )()
        if firstTime:
            print("First Run")
            saveStatesWith = self._jvm.scala.Some(state)
            aggregateWith = getattr(
                self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner,
                "run$default$3",
            )()
        else:
            print("Incremental Run")
            aggregateWith = self._jvm.scala.Some(state)
            saveStatesWith = getattr(
                self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner,
                "run$default$4",
            )()
        print(saveStatesWith,aggregateWith)
        return self.analysisRunner.run(
            df._jdf,
            self.analysis,
            aggregateWith,
            saveStatesWith,
            storageLevelOfGroupedDataForMultiplePasses,
        )


In [1]:
import os
os.environ['PYSPARK_PYTHON']='python'

from pyspark.sql import SparkSession

spark = (SparkSession.builder.config("spark.jars","deequ-2.0.1-spark-3.2.jar").getOrCreate())

In [3]:
day1 = spark.createDataFrame([
  (1, "Thingy A", "awesome thing.", "high", 0),
  (2, "Thingy B", None, "low", 0),
  (3, None, None, None, 5)]).toDF(*['id','name','description','demand','quantity'])

In [4]:
from pydeequ.analyzers import *

analysis = (IncreamentalAnalysis()
 .addAnalyzer(Size())
 .addAnalyzer(Completeness("id"))
 .addAnalyzer(Completeness("name"))
 .addAnalyzer(Completeness("description"))
 .generate(spark)
)

analysisRunner = IcreamentalRunner(spark,analysis)

In [69]:
state = spark._jvm.com.amazon.deequ.analyzers.InMemoryStateProvider()

In [5]:
data = analysisRunner.run(day1,state)

First Run


In [6]:
AnalyzerContext.successMetricsAsDataFrame(spark, data).drop('entity').show()

+-----------+------------+------------------+
|   instance|        name|             value|
+-----------+------------+------------------+
|          *|        Size|               3.0|
|         id|Completeness|               1.0|
|       name|Completeness|0.6666666666666666|
|description|Completeness|0.3333333333333333|
+-----------+------------+------------------+





In [7]:
day2 = spark.createDataFrame([
  (4, "Thingy D", None, "low", 10),
  (5, "Thingy E", 'awesome', "high", 12)
]).toDF(*['id','name','description','demand','quantity'])

In [70]:
state = spark._jvm.com.amazon.deequ.analyzers.InMemoryStateProvider()

In [8]:
data = analysisRunner.run(day2,state,False)

Incremental Run


In [9]:
AnalyzerContext.successMetricsAsDataFrame(spark, data).drop('entity').show()

+-----------+------------+-----+
|   instance|        name|value|
+-----------+------------+-----+
|          *|        Size|  5.0|
|         id|Completeness|  1.0|
|       name|Completeness|  0.8|
|description|Completeness|  0.4|
+-----------+------------+-----+



In [2]:
deDF = spark.createDataFrame([
  (1, "ManufacturerA", "DE"),
  (2, "ManufacturerB", "DE")]).toDF(*['id','name','country'])

usDF = spark.createDataFrame([
  (3, "ManufacturerD", "US"),
  (4, "ManufacturerE", "US"),
  (5, "ManufacturerF", "US")]).toDF(*['id','name','country'])

cnDF = spark.createDataFrame([
  (6, "ManufacturerG", "CN"),
  (7, "ManufacturerH", "CN")]).toDF(*['id','name','country'])

In [60]:
from pydeequ.checks import *

check = (Check(spark, CheckLevel.Warning, "Review Check")
        .isComplete("name")
        .containsURL("name", lambda x : x == 0)
        .isContainedIn("country", ["DE", "US", "CN"])
        )

In [61]:
analysis =  spark._jvm.com.amazon.deequ.analyzers.Analysis(check._Check.requiredAnalyzers().toSeq())

In [62]:
print(analysis)

Analysis(Vector(Completeness(name,None), PatternMatch(name,(https?|ftp)://[^\s/$.?#].[^\s]*,None), Compliance(country contained in DE,US,CN,`country` IS NULL OR `country` IN ('DE','US','CN'),None)))


In [63]:
analysisRunner = IcreamentalRunner(spark,analysis)

In [90]:
deState = spark._jvm.com.amazon.deequ.analyzers.InMemoryStateProvider()
usState = spark._jvm.com.amazon.deequ.analyzers.InMemoryStateProvider()
cnState = spark._jvm.com.amazon.deequ.analyzers.InMemoryStateProvider()

In [91]:
deData = analysisRunner.run(deDF,deState)

First Run
Some() None


In [92]:
from pydeequ.analyzers import *

AnalyzerContext.successMetricsAsDataFrame(spark, deData).drop('entity').show()

+--------------------+------------+-----+
|            instance|        name|value|
+--------------------+------------+-----+
|                name|Completeness|  1.0|
|                name|PatternMatch|  0.0|
|country contained...|  Compliance|  1.0|
+--------------------+------------+-----+



In [93]:
usData = analysisRunner.run(usDF,usState)

AnalyzerContext.successMetricsAsDataFrame(spark, usData).drop('entity').show()

First Run
Some() None
+--------------------+------------+-----+
|            instance|        name|value|
+--------------------+------------+-----+
|                name|Completeness|  1.0|
|                name|PatternMatch|  0.0|
|country contained...|  Compliance|  1.0|
+--------------------+------------+-----+



In [94]:
cnData = analysisRunner.run(cnDF,cnState)

AnalyzerContext.successMetricsAsDataFrame(spark, cnData).drop('entity').show()

First Run
Some() None
+--------------------+------------+-----+
|            instance|        name|value|
+--------------------+------------+-----+
|                name|Completeness|  1.0|
|                name|PatternMatch|  0.0|
|country contained...|  Compliance|  1.0|
+--------------------+------------+-----+



In [71]:
java_analysisRunner = spark._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner

In [77]:
saveStatesWith = getattr(
                spark._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner,
                "runOnAggregatedStates$default$4",
            )()
storageLevelOfGroupedDataForMultiplePasses = getattr(
                spark._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner,
                "runOnAggregatedStates$default$5",
            )()
metricsRepository = getattr(
                spark._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner,
                "runOnAggregatedStates$default$6",
            )()
saveOrAppendResultsWithKey = getattr(
                spark._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunner,
                "runOnAggregatedStates$default$7",
            )()

In [81]:
from pydeequ.scala_utils import *

In [84]:
aggData = java_analysisRunner.runOnAggregatedStates(
                    usDF._jdf.schema(),
                    analysis,
                    to_scala_seq(spark._jvm, [deState,usState,cnState]),
                    saveStatesWith,
                    storageLevelOfGroupedDataForMultiplePasses,
                    metricsRepository,
                    saveOrAppendResultsWithKey
                )

In [86]:
AnalyzerContext.successMetricsAsDataFrame(spark, aggData).drop('entity').show()

+--------------------+------------+-----+
|            instance|        name|value|
+--------------------+------------+-----+
|                name|Completeness|  1.0|
|                name|PatternMatch|  0.0|
|country contained...|  Compliance|  1.0|
+--------------------+------------+-----+



In [88]:
updatedUS = spark.createDataFrame([
  (3, "ManufacturerDNew", "US"),
  (4, None, "US"),
  (5, "ManufacturerFNew http://clickme.com", "US")]).toDF(*['id','name','country'])

In [89]:
updatedUsState = spark._jvm.com.amazon.deequ.analyzers.InMemoryStateProvider()

In [96]:
updatedUsData = analysisRunner.run(updatedUS,updatedUsState)

AnalyzerContext.successMetricsAsDataFrame(spark, updatedUsData).drop('entity').show()

First Run
Some(Completeness(name,None) => NumMatchesAndCount(2,3)
PatternMatch(name,(https?|ftp)://[^\s/$.?#].[^\s]*,None) => NumMatchesAndCount(1,3)
Compliance(country contained in DE,US,CN,`country` IS NULL OR `country` IN ('DE','US','CN'),None) => NumMatchesAndCount(3,3)
) None
+--------------------+------------+------------------+
|            instance|        name|             value|
+--------------------+------------+------------------+
|                name|Completeness|0.6666666666666666|
|                name|PatternMatch|0.3333333333333333|
|country contained...|  Compliance|               1.0|
+--------------------+------------+------------------+



In [97]:
aggData = java_analysisRunner.runOnAggregatedStates(
                    usDF._jdf.schema(),
                    analysis,
                    to_scala_seq(spark._jvm, [deState,updatedUsState,cnState]),
                    saveStatesWith,
                    storageLevelOfGroupedDataForMultiplePasses,
                    metricsRepository,
                    saveOrAppendResultsWithKey
                )

In [99]:
AnalyzerContext.successMetricsAsDataFrame(spark, aggData).drop('entity').show()

+--------------------+------------+-------------------+
|            instance|        name|              value|
+--------------------+------------+-------------------+
|                name|Completeness| 0.8571428571428571|
|                name|PatternMatch|0.14285714285714285|
|country contained...|  Compliance|                1.0|
+--------------------+------------+-------------------+

