In [1]:
%env SPARK_VERSION=3.0.0

env: SPARK_VERSION=3.0.0


In [2]:
import pydeequ

from pyspark.sql import SparkSession, Row

# TODO create spark session with jdbc driver path

In [3]:
server_name = "jdbc:sqlserver://host.docker.internal"
database_name = "TRN"
jdbc_url = server_name + ";" + "databaseName=" + database_name + ";trustServerCertificate=True;"

table_name = "hr.employees"
username = "DQTestUser"
password = "DQTesting111" # Please specify password here
connection_details = { "user": username, "password": password, "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", }

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("spark.driver.extraClassPath", "/home/jars/sqlserver/sqlserverjdbc.jar")
    .getOrCreate())

df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_details)

In [4]:
df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- salary: decimal(8,2) (nullable = true)
 |-- manager_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)



In [5]:
df.show(20,False)

+-----------+-----------+----------+---------------------------------+------------+----------+------+--------+----------+-------------+
|employee_id|first_name |last_name |email                            |phone_number|hire_date |job_id|salary  |manager_id|department_id|
+-----------+-----------+----------+---------------------------------+------------+----------+------+--------+----------+-------------+
|100        |Steven     |King      |steven.king@sqltutorial.org      |515.123.4567|1987-06-17|4     |24000.00|null      |9            |
|101        |Neena      |Kochhar   |neena.kochhar@sqltutorial.org    |515.123.4568|1989-09-21|5     |17000.00|100       |9            |
|102        |Lex        |De Haan   |lex.de haan@sqltutorial.org      |515.123.4569|1993-01-13|5     |17000.00|100       |9            |
|103        |Alexander  |Hunold    |alexander.hunold@sqltutorial.org |590.423.4567|1990-01-03|9     |9000.00 |102       |6            |
|104        |Bruce      |Ernst     |bruce.ernst@

In [6]:
### Data Analyzers section
# TODO analyze data here
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(ApproxCountDistinct("employee_id")) \
    .addAnalyzer(Completeness("employee_id")) \
    .addAnalyzer(ApproxCountDistinct("job_id")) \
    .addAnalyzer(Completeness("first_name,last_name")) \
    .addAnalyzer(MinLength("first_name")) \
    .addAnalyzer(MinLength("last_name")) \
    .addAnalyzer(Completeness("email")) \
    .addAnalyzer(Completeness("phone_number")) \
    .addAnalyzer(Completeness("hire_date")) \
    .addAnalyzer(Completeness("job_id")) \
    .addAnalyzer(Mean("salary")) \
    .addAnalyzer(Completeness("salary")) \
    .addAnalyzer(Compliance("salary", "salary > 0")) \
    .addAnalyzer(Completeness("department_id")) \
    .run()
                    
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+-------------+-------------------+------------------+
| entity|     instance|               name|             value|
+-------+-------------+-------------------+------------------+
| Column|       job_id|ApproxCountDistinct|              19.0|
| Column|       job_id|       Completeness|               1.0|
| Column|       salary|         Compliance|0.9318181818181818|
| Column|  employee_id|ApproxCountDistinct|              44.0|
| Column|  employee_id|       Completeness|               1.0|
| Column|    hire_date|       Completeness|               1.0|
| Column|department_id|       Completeness|               1.0|
| Column|    last_name|          MinLength|               3.0|
| Column|        email|       Completeness|               1.0|
|Dataset|            *|               Size|              44.0|
| Column| phone_number|       Completeness|0.8636363636363636|
| Column|       salary|               Mean| 7327.278409090909|
| Column|       salary|       Completeness|            

In [7]:
analysisResult_pandas_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult, pandas=True)
analysisResult_pandas_df

Unnamed: 0,entity,instance,name,value
0,Column,job_id,ApproxCountDistinct,19.0
1,Column,job_id,Completeness,1.0
2,Column,salary,Compliance,0.931818
3,Column,employee_id,ApproxCountDistinct,44.0
4,Column,employee_id,Completeness,1.0
5,Column,hire_date,Completeness,1.0
6,Column,department_id,Completeness,1.0
7,Column,last_name,MinLength,3.0
8,Column,email,Completeness,1.0
9,Dataset,*,Size,44.0


In [8]:
### Data profiling section
# TODO profile data here
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

#for col, profile in result.profiles.items():
#    print(col)

for col, profile in result.profiles.items():
    print("Column '{}':\n ".format(col) +
        "\tcompleteness: {}\n".format(profile.completeness) +
        "\tapproximate number of distinct values: {}\n".format(profile.approximateNumDistinctValues) +
        "\tdatatype: {}\n".format(profile.dataType) +
        ("\tmean: {}\n".format(profile.mean) if profile.dataType == 'Integral' else '') +
        ("\tmax: {}\n".format(profile.maximum) if profile.dataType == 'Integral' else '' ) +
        ("\tmin: {}\n".format(profile.minimum)if profile.dataType == 'Integral' else '' ) +
        ("\tsum: {}\n".format(profile.sum) if profile.dataType == 'Integral' else '' )
         )


Column 'first_name':
 	completeness: 1.0
	approximate number of distinct values: 39
	datatype: String

Column 'employee_id':
 	completeness: 1.0
	approximate number of distinct values: 44
	datatype: Integral
	mean: 146.4318181818182
	max: 210.0
	min: 100.0
	sum: 6443.0

Column 'hire_date':
 	completeness: 1.0
	approximate number of distinct values: 39
	datatype: Unknown

Column 'manager_id':
 	completeness: 0.9772727272727273
	approximate number of distinct values: 10
	datatype: Integral
	mean: 119.09302325581395
	max: 205.0
	min: 100.0
	sum: 5121.0

Column 'phone_number':
 	completeness: 0.8636363636363636
	approximate number of distinct values: 36
	datatype: String

Column 'email':
 	completeness: 1.0
	approximate number of distinct values: 45
	datatype: String

Column 'department_id':
 	completeness: 1.0
	approximate number of distinct values: 11
	datatype: Integral
	mean: 6.818181818181818
	max: 11.0
	min: 1.0
	sum: 300.0

Column 'job_id':
 	completeness: 1.0
	approximate number of

In [9]:
### Constraint Suggestions section
# TODO find meaninful constraints here
from pydeequ.suggestions import *
from pyspark.sql import Row

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
#print(suggestionResult)
suggdf = spark.createDataFrame(Row(**x) for x in suggestionResult['constraint_suggestions']) #.show(truncate=False)
suggdf.sort("column_name","current_value").select('column_name','current_value','description','code_for_constraint').show(truncate=False)

+-------------+--------------------------------------+---------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+
|column_name  |current_value                         |description                                                                                              |code_for_constraint                                                                                                             |
+-------------+--------------------------------------+---------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+
|department_id|Completeness: 1.0                     |'department_id' is not null                                                 

In [14]:
### Constraint Verification section
# TODO check selected constraints here and make beautify the report
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check \
        .hasMin("job_id", lambda x: x >= 1) \
        .hasMin("department_id", lambda x: x >= 1) \
        .hasMin("employee_id", lambda x: x >= 1) \
        .hasMin("job_id", lambda x: x >= 1) \
        .hasMin("manager_id", lambda x: x >= 1) \
        .isComplete("department_id")  \
        #.isNonNegative("department_id") \
        .isComplete("email") \
        .isUnique("employee_id")  \
        .isComplete("employee_id") \
        #.isNonNegative("employee_id") \
        .isComplete("first_name") \
        .isComplete("hire_date") \
        .isComplete("job_id") \
        #.isNonNegative("job_id") \
        #.isNonNegative("manager_id") \
        .isUnique("last_name") \
        .isComplete("last_name") \
        .hasCompleteness("phone_number", lambda x: x >= 0.75, "It should be above 0.75!") \
        .isComplete("salary") \
        .isNonNegative("salary") \
        .hasMinLength("first_name", lambda x: x == 3) \
        .hasMinLength("last_name", lambda x: x == 3)  \
    ) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

<pydeequ.checks.Check object at 0x7f49e040c8e0>
+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|check       |check_level|check_status|constraint                                                                                                      |constraint_status|constraint_message|
+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------+-----------------+------------------+
+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------+-----------------+------------------+



In [15]:
checkResult_pandas_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult, pandas=True)
checkResult_pandas_df

Unnamed: 0,entity,instance,name,value
0,Column,employee_id,Uniqueness,1.0
1,Column,job_id,Minimum,1.0
2,Column,job_id,Completeness,1.0
3,Column,employee_id,Minimum,100.0
4,Column,employee_id,Completeness,1.0
5,Column,hire_date,Completeness,1.0
6,Column,salary is non-negative,Compliance,1.0
7,Column,last_name,Uniqueness,1.0
8,Column,department_id,Minimum,1.0
9,Column,department_id,Completeness,1.0
