Install everything necessary to make spark work. :)

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [4]:
import findspark
findspark.init()

In [5]:
!pip install pydeequ

Collecting pydeequ
  Downloading pydeequ-1.0.1-py3-none-any.whl (36 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.0.1


In [6]:
from pyspark.sql import SparkSession, Row
import pydeequ

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())
df = spark.read.option("header","true").csv('/content/drive/MyDrive/Colab Notebooks/df_30.csv')

Please set env variable SPARK_VERSION


In [7]:
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- SAMPLE: string (nullable = true)
 |-- SERIAL: string (nullable = true)
 |-- CBSERIAL: string (nullable = true)
 |-- NUMPREC: string (nullable = true)
 |-- HHWT: string (nullable = true)
 |-- HHTYPE: string (nullable = true)
 |-- CLUSTER: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- STATEFIP: string (nullable = true)
 |-- COUNTYFIP: string (nullable = true)
 |-- DENSITY: string (nullable = true)
 |-- CITY12: string (nullable = true)
 |-- CITYPOP: string (nullable = true)
 |-- STRATA: string (nullable = true)
 |-- CNTRY: string (nullable = true)
 |-- GQ: string (nullable = true)
 |-- OWNERSHP: string (nullable = true)
 |-- OWNERSHPD: string (nullable = true)
 |-- MORTGAGE: string (nullable = true)
 |-- TAXINCL: string (nullable = true)
 |-- INSINCL: string (nullable = true)
 |-- RENT: string (nullable = true)
 |-- RENTGRS: string (nullable = true)
 |-- HHINCOME: string (nullable = true)
 |-- VALUEH: string (nullable = t

In [8]:
df = df.select(['YEAR','REGION','DENSITY','CNTRY','NCOUPLES','gender','AGE','EDUCD_NAME','occupation','DEGFIELD_NAME'])

## Metrics Computation

In [12]:
from pydeequ.analyzers import AnalysisRunner, AnalyzerContext, ApproxCountDistinct, Completeness, Compliance, Mean, Size

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("AGE")) \
                    .addAnalyzer(ApproxCountDistinct("AGE")) \
                    .addAnalyzer(Mean("AGE")) \
                    .addAnalyzer(Compliance("AGE", "AGE > 0")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+-------------------+--------+
| entity|instance|               name|   value|
+-------+--------+-------------------+--------+
| Column|     AGE|         Compliance|     1.0|
|Dataset|       *|               Size|483568.0|
| Column|     AGE|       Completeness|     1.0|
| Column|     AGE|ApproxCountDistinct|    85.0|
+-------+--------+-------------------+--------+



In [13]:
from pydeequ.profiles import ColumnProfilerRunner

result = ColumnProfilerRunner(spark) \
            .onData(df) \
            .run()

In [14]:
for col, profile in result.profiles.items():
    print(f'Column \'{col}\'')
    print('\t',f'completeness: {profile.completeness}')
    print('\t',f'approximate number of distinct values: {profile.approximateNumDistinctValues}')
    print('\t',f'datatype: {profile.dataType}')

Column 'CNTRY'
	 completeness: 1.0
	 approximate number of distinct values: 1
	 datatype: Integral
Column 'DENSITY'
	 completeness: 1.0
	 approximate number of distinct values: 3784
	 datatype: Fractional
Column 'AGE'
	 completeness: 1.0
	 approximate number of distinct values: 85
	 datatype: Integral
Column 'YEAR'
	 completeness: 1.0
	 approximate number of distinct values: 9
	 datatype: Integral
Column 'REGION'
	 completeness: 1.0
	 approximate number of distinct values: 9
	 datatype: Fractional
Column 'NCOUPLES'
	 completeness: 1.0
	 approximate number of distinct values: 6
	 datatype: Integral
Column 'DEGFIELD_NAME'
	 completeness: 1.0
	 approximate number of distinct values: 38
	 datatype: String
Column 'occupation'
	 completeness: 1.0
	 approximate number of distinct values: 479
	 datatype: String
Column 'EDUCD_NAME'
	 completeness: 1.0
	 approximate number of distinct values: 25
	 datatype: String
Column 'gender'
	 completeness: 1.0
	 approximate number of distinct values: 2
	 d

## Constraint Suggestion

In [15]:
from pydeequ.suggestions import ConstraintSuggestionRunner, DEFAULT

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

In [16]:
for sugg in suggestionResult['constraint_suggestions']:
    print(f"Constraint suggestion for \'{sugg['column_name']}\': {sugg['description']}")
    print(f"The corresponding Python code is: {sugg['code_for_constraint']}\n")

Constraint suggestion for 'CNTRY': 'CNTRY' has value range '840'
The corresponding Python code is: .isContainedIn("CNTRY", ["840"])

Constraint suggestion for 'CNTRY': 'CNTRY' is not null
The corresponding Python code is: .isComplete("CNTRY")

Constraint suggestion for 'CNTRY': 'CNTRY' has no negative values
The corresponding Python code is: .isNonNegative("CNTRY")

Constraint suggestion for 'CNTRY': 'CNTRY' has type Integral
The corresponding Python code is: .hasDataType("CNTRY", ConstrainableDataTypes.Integral)

Constraint suggestion for 'DENSITY': 'DENSITY' is not null
The corresponding Python code is: .isComplete("DENSITY")

Constraint suggestion for 'DENSITY': 'DENSITY' has no negative values
The corresponding Python code is: .isNonNegative("DENSITY")

Constraint suggestion for 'DENSITY': 'DENSITY' has type Fractional
The corresponding Python code is: .hasDataType("DENSITY", ConstrainableDataTypes.Fractional)

Constraint suggestion for 'AGE': 'AGE' has value range '20', '19', '21'

## Constraint Verification

In [22]:
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes
from pydeequ.verification import VerificationResult, VerificationSuite

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasDataType("AGE", ConstrainableDataTypes.Integral) \
        .hasMin("AGE", lambda x: x == 0) \
        .isNonNegative("AGE") \
        .isComplete("AGE") \
     ) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+------------+-----------+------------+----------------------------------------------------------------------------------------------------------+-----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|check       |check_level|check_status|constraint                                                                                                |constraint_status|constraint_message                                                                                                                                                                     |
+------------+-----------+------------+----------------------------------------------------------------------------------------------------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------

## Repository

In [18]:
from pydeequ.repository import FileSystemMetricsRepository, ResultKey
from pydeequ.analyzers import AnalysisRunner, ApproxCountDistinct

metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {'tag': 'Age'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(ApproxCountDistinct('AGE')) \
    .useRepository(repository) \
    .saveOrAppendResult(resultKey) \
    .run()

In [19]:
resultKey2 = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

In [20]:
AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(ApproxCountDistinct('AGE')) \
    .useRepository(repository) \
    .saveOrAppendResult(resultKey2) \
    .run()

JavaObject id=o517

In [21]:
analysisResult_metRep = repository.load() \
                            .before(ResultKey.current_milli_time()) \
                            .getSuccessMetricsAsDataFrame()

analysisResult_metRep.show()

+------+--------+-------------------+-----+-------------+---+
|entity|instance|               name|value| dataset_date|tag|
+------+--------+-------------------+-----+-------------+---+
|Column|     AGE|ApproxCountDistinct| 85.0|1651284773492|Age|
|Column|     AGE|ApproxCountDistinct| 85.0|1651284778783|Age|
+------+--------+-------------------+-----+-------------+---+

