In [1]:
%%configure -f
{
    "conf": {
        "spark.jars": "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.2/deequ-1.0.2.jar"
    }
}

In [2]:
val dataset = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
12,application_1578964032290_0012,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dataset: org.apache.spark.sql.DataFrame = [marketplace: string, customer_id: string ... 13 more fields]


In [3]:
dataset.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)



In [4]:
import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}


In [5]:
val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(dataset)
  // define analyzers that compute metrics
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("review_id"))
  .addAnalyzer(ApproxCountDistinct("review_id"))
  .addAnalyzer(Mean("star_rating"))
  .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0"))
  .addAnalyzer(Correlation("total_votes", "star_rating"))
  .addAnalyzer(Correlation("total_votes", "helpful_votes"))
  // compute metrics
  .run()
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

analysisResult: com.amazon.deequ.analyzers.runners.AnalyzerContext = AnalyzerContext(Map(Completeness(review_id,None) -> DoubleMetric(Column,Completeness,review_id,Success(1.0)), ApproxCountDistinct(review_id,None) -> DoubleMetric(Column,ApproxCountDistinct,review_id,Success(3010972.0)), Correlation(total_votes,star_rating,None) -> DoubleMetric(Mutlicolumn,Correlation,total_votes,star_rating,Success(-0.034510979965387656)), Size(None) -> DoubleMetric(Dataset,Size,*,Success(3120938.0)), Mean(star_rating,None) -> DoubleMetric(Column,Mean,star_rating,Success(4.036143941340712)), Compliance(top star_rating,star_rating >= 4.0,None) -> DoubleMetric(Column,Compliance,top star_rating,Success(0.7494070692849394)), Correlation(total_votes,helpful_votes,None) -> DoubleMetric(Mutlicolumn,Correlatio...

In [6]:
val metrics = successMetricsAsDataFrame(spark, analysisResult)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

metrics: org.apache.spark.sql.DataFrame = [entity: string, instance: string ... 2 more fields]


In [7]:
metrics.show() 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+-------------------+--------------------+
|     entity|            instance|               name|               value|
+-----------+--------------------+-------------------+--------------------+
|     Column|           review_id|       Completeness|                 1.0|
|     Column|           review_id|ApproxCountDistinct|           3010972.0|
|Mutlicolumn|total_votes,star_...|        Correlation|-0.03451097996538...|
|    Dataset|                   *|               Size|           3120938.0|
|     Column|         star_rating|               Mean|   4.036143941340712|
|     Column|     top star_rating|         Compliance|  0.7494070692849394|
|Mutlicolumn|total_votes,helpf...|        Correlation|  0.9936463809903864|
+-----------+--------------------+-------------------+--------------------+



In [8]:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}


In [9]:
val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
  .onData(dataset)
  // define a data quality check
  .addCheck(
    Check(CheckLevel.Error, "Review Check") 
      .hasSize(_ >= 3000000) // at least 3 million rows
      .hasMin("star_rating", _ == 1.0) // min is 1.0
      .hasMax("star_rating", _ == 5.0) // max is 5.0
      .isComplete("review_id") // should never be NULL
      .isUnique("review_id") // should not contain duplicates
      .isComplete("marketplace") // should never be NULL
      // contains only the listed values
      .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
      .isNonNegative("year")) // should not contain negative values
  // compute metrics and verify check conditions
  .run()
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

verificationResult: com.amazon.deequ.VerificationResult = VerificationResult(Error,Map(Check(Error,Review Check,List(SizeConstraint(Size(None)), MinimumConstraint(Minimum(star_rating,None)), MaximumConstraint(Maximum(star_rating,None)), CompletenessConstraint(Completeness(review_id,None)), UniquenessConstraint(Uniqueness(List(review_id))), CompletenessConstraint(Completeness(marketplace,None)), ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,`marketplace` IS NULL OR `marketplace` IN ('US','UK','DE','JP','FR'),None)), ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)))) -> CheckResult(Check(Error,Review Check,List(SizeConstraint(Size(None)), MinimumConstraint(Minimum(star_rating,None)), MaximumConstraint(Maximum(star_rating,None))...

In [10]:
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

resultDataFrame: org.apache.spark.sql.DataFrame = [check: string, check_level: string ... 4 more fields]


In [11]:
resultDataFrame.show(truncate=false) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check       |check_level|check_status|constraint                                                                                                                                         |constraint_status|constraint_message                                                 |
+------------+-----------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|Review Check|Error      |Error       |SizeConstraint(Size(None))                                                                                                                 

In [12]:
VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=false)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------------------------------+------------+------------------+
|entity |instance                               |name        |value             |
+-------+---------------------------------------+------------+------------------+
|Column |review_id                              |Completeness|1.0               |
|Column |review_id                              |Uniqueness  |0.9926566948782706|
|Dataset|*                                      |Size        |3120938.0         |
|Column |star_rating                            |Maximum     |5.0               |
|Column |star_rating                            |Minimum     |1.0               |
|Column |year is non-negative                   |Compliance  |1.0               |
|Column |marketplace contained in US,UK,DE,JP,FR|Compliance  |1.0               |
|Column |marketplace                            |Completeness|1.0               |
+-------+---------------------------------------+------------+------------------+



# Automated Constraint Suggestion

In [13]:
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._


In [14]:
// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
  // data to suggest constraints for
  .onData(dataset)
  // default set of rules for constraint suggestion
  .addConstraintRules(Rules.DEFAULT)
  // run data profiling and constraint suggestion
  .run()
}

// We can now investigate the constraints that Deequ suggested. 
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { 
  case (column, suggestions) => 
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    } 
}.toSeq.toDS()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unable to map type DateType
suggestionResult: com.amazon.deequ.suggestions.ConstraintSuggestionResult = ConstraintSuggestionResult(Map(review_id -> StandardColumnProfile(review_id,1.0,3010972,String,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 0, Unknown -> 0, String -> 3120938),None), customer_id -> NumericColumnProfile(customer_id,1.0,2170036,Integral,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 3120938, Unknown -> 0, String -> 0),None,Some(2.880603268895954E7),Some(5.3096582E7),Some(10005.0),Some(8.9901842048216E13),Some(1.5415072111267326E7),Some(Stream(10005.0, ?))), review_date -> StandardColumnProfile(review_date,1.0,5898,Unknown,false,Map(),None), helpful_votes -> NumericColumnProfile(helpful_votes,1.0,872,Integral,false,Map(),None,Some(1.865194053838942),Some(12786.0),Some(0.0),Some(5821155....suggestionDataFrame: org.apache.spark.sql.Dataset[(String, String, String)] = [_1: string, _2: string ... 1 more field]


In [15]:
suggestionDataFrame.show(truncate=false)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-------------------------------------------------+--------------------------------------------------------------------------------+
|_1             |_2                                               |_3                                                                              |
+---------------+-------------------------------------------------+--------------------------------------------------------------------------------+
|review_id      |'review_id' is not null                          |.isComplete("review_id")                                                        |
|customer_id    |'customer_id' is not null                        |.isComplete("customer_id")                                                      |
|customer_id    |'customer_id' has type Integral                  |.hasDataType("customer_id", ConstrainableDataTypes.Integral)                    |
|customer_id    |'customer_id' has no negative values             |.isNonNegative("customer_id")          