In [1]:
import sys
!{sys.executable} -m pip install great_expectations[spark]==1.6.3
!{sys.executable} -m pip install typing_extensions==4.14.1 --upgrade

Collecting great_expectations==1.6.3 (from great_expectations[spark]==1.6.3)
  Downloading great_expectations-1.6.3-py3-none-any.whl.metadata (9.3 kB)
Collecting altair<5.0.0,>=4.2.1 (from great_expectations==1.6.3->great_expectations[spark]==1.6.3)
  Downloading altair-4.2.2-py3-none-any.whl.metadata (13 kB)
Collecting marshmallow<4.0.0,>=3.7.1 (from great_expectations==1.6.3->great_expectations[spark]==1.6.3)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting posthog>3 (from great_expectations==1.6.3->great_expectations[spark]==1.6.3)
  Downloading posthog-7.0.1-py3-none-any.whl.metadata (6.0 kB)
Collecting pydantic>=1.10.7 (from great_expectations==1.6.3->great_expectations[spark]==1.6.3)
  Downloading pydantic-2.12.4-py3-none-any.whl.metadata (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.9/89.9 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
Collecting tzlocal>=1.2 (from great_expectations==1.6.3->great_expectations[spark]==1.6

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Lab9_DataQuality_Example")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the Spark session, which is the entry point to the Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark_df = spark.read.format("csv").option("header", "true") \
       .load("/home/jovyan/data/restaurants_quality_issues.csv")
spark_df.printSchema()

#Data Conversion

spark_df = spark_df.withColumn("rid", col("rid").cast("int")) \
       .withColumn("avg_price", col("avg_price").cast("float")) \
       .withColumn("rating", col("rating").cast("float")) \
       .withColumn("contact", col("contact").cast("string"))
spark_df.printSchema()
spark_df.show(100)

root
 |-- rid: string (nullable = true)
 |-- avg_price: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- contact: string (nullable = true)

root
 |-- rid: integer (nullable = true)
 |-- avg_price: float (nullable = true)
 |-- rating: float (nullable = true)
 |-- contact: string (nullable = true)

+----+---------+------+--------------------+
| rid|avg_price|rating|             contact|
+----+---------+------+--------------------+
|   1|    58.98|  2.49|lbuckeridge0@indi...|
|   2|    60.32|  3.69|wstrickland1@goog...|
|   3|    61.94|  2.57|smiroy2@delicious...|
|   4|    77.15|  4.34|radamoli3@google....|
|   5|    36.39|  3.78|  cwiersma4@etsy.com|
|   6|    36.03|  3.23|fleteve5@wundergr...|
|   7|    31.24|  3.59|bkoschek6@spiegel.de|
|   8|    67.21|  3.69|   llansly7@sohu.com|
|   9|    37.06|  3.03|bmatton8@edublogs...|
|  10|    51.98|  2.15|     bdinis9@fda.gov|
|  11|    46.37| 11.86|   wdaysha@phpbb.com|
|  12|    89.75|  3.16|rbaddileyb@unesco...|
|  13|  

### Look for Available Expectations at https://greatexpectations.io/expectations/  

In [11]:
import great_expectations as gx
import json
# Get a context.
context = gx.get_context()

# Create a suite
expectation_ratingc = gx.expectations.ExpectColumnValuesToBeBetween(
    column="rating", min_value=0, max_value=6
)
expectation_contact = gx.expectations.ExpectColumnValuesToMatchRegex(
    column="contact", regex=r"^([a-z0-9_\.-]+)@([\da-z\.-]+)\.([a-z\.]{2,6})$"
)
expectation_rid = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="rid"
)
expectation_contact_null = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="contact"
)

suite = context.suites.add(gx.ExpectationSuite(name="restaurants_suite"))
suite.add_expectation(expectation_ratingc)
suite.add_expectation(expectation_contact)
suite.add_expectation(expectation_rid)
suite.add_expectation(expectation_contact_null)

# Configure data to validate
datasource = context.data_sources.add_spark(name="restaurants_spark")
asset = datasource.add_dataframe_asset(name="restaurants_asset")
bd = asset.add_batch_definition_whole_dataframe(name="restaurants_bd")

# Configure and run the validation
vd = gx.ValidationDefinition(
    name="restaurants_vd",
    data=bd,
    suite=suite,
)
context.validation_definitions.add(vd)
checkpoint = gx.Checkpoint(
    name="restaurants_checkpoint", 
    validation_definitions=[vd], 
    actions=[gx.checkpoint.UpdateDataDocsAction(name="update_data_docs")]
)
context.checkpoints.add(checkpoint)
# Look at results
results = checkpoint.run(batch_parameters={"dataframe": spark_df})
context.open_data_docs()
results_report = results.describe()
print(results_report)
json_report = json.loads(results_report)
print("***JSON Version***")
print(json_report)

Calculating Metrics:   0%|          | 0/31 [00:00<?, ?it/s]

{
    "success": false,
    "statistics": {
        "evaluated_validations": 1,
        "success_percent": 0.0,
        "successful_validations": 0,
        "unsuccessful_validations": 1
    },
    "validation_results": [
        {
            "success": false,
            "statistics": {
                "evaluated_expectations": 4,
                "successful_expectations": 0,
                "unsuccessful_expectations": 4,
                "success_percent": 0.0
            },
            "expectations": [
                {
                    "expectation_type": "expect_column_values_to_be_between",
                    "success": false,
                    "kwargs": {
                        "batch_id": "restaurants_spark-restaurants_asset",
                        "column": "rating",
                        "min_value": 0.0,
                        "max_value": 6.0
                    },
                    "result": {
                        "element_count": 865,
                  

In [30]:
# Get Summary
for validation_result in json_report.get('validation_results', []):
    for expectation_item in validation_result.get('expectations', []):
        print(expectation_item['expectation_type'], expectation_item.get('result', []).get('unexpected_count', 'Success'))

expect_column_values_to_be_between 6
expect_column_values_to_match_regex 2
expect_column_values_to_not_be_null 2
expect_column_values_to_not_be_null 2


In [31]:
# Stop the Spark context
spark.stop()