* Master DAC, BDLE, 2022 
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr

# Data quality verification

https://github.com/awslabs/deequ

and its python-based version

https://github.com/awslabs/python-deequ

https://www.amazon.science/publications/automating-large-scale-data-quality-verification

an issue on how to use it on colab
https://github.com/awslabs/python-deequ/issues/26

https://medium.com/codex/how-to-check-data-quality-in-pyspark-8a882e45bc95


## setup

downloads 

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!rm spark-3.0.1-bin-hadoop2.7.tgz

In [None]:
import os
os.environ["SPARK_VERSION"] = "3.0.1"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"


In [None]:
! echo $SPARK_VERSION
! echo $SPARK_HOME

3.0.1
/content/spark-3.0.1-bin-hadoop2.7


In [None]:
!pip install -q findspark

In [None]:
!pip install -q pydeequ

In [None]:
import findspark
findspark.init()

In [None]:
# importing Spark and pydeequ
from pyspark.sql import SparkSession, Row
import pydeequ


spark = SparkSession.builder\
.master("local")\
.appName("Colab")\
.config('spark.ui.port', '4050')\
.config("spark.jars.packages", pydeequ.deequ_maven_coord)\
.config("spark.jars.excludes", pydeequ.f2j_maven_coord)\
.getOrCreate()

conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '8g'), ('spark.driver.memory','8g')])

In [None]:
spark

In [None]:
# import pydeequ

# spark = (SparkSession
#     .builder
#     .config("spark.jars.packages", pydeequ.deequ_maven_coord)
#     .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
#     .getOrCreate())

## usage

### examples for the official doc

In [None]:
df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()
df.show()

+---+---+----+
|  a|  b|   c|
+---+---+----+
|foo|  1|   5|
|bar|  2|   6|
|baz|  3|null|
+---+---+----+



#### analyzers

In [None]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("b")) \
                    .run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+------------+-----+
| entity|instance|        name|value|
+-------+--------+------------+-----+
|Dataset|       *|        Size|  3.0|
| Column|       b|Completeness|  1.0|
+-------+--------+------------+-----+



#### profile

In [None]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: a: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 3
    },
    "histogram": [
        [
            "baz",
            1,
            0.3333333333333333
        ],
        [
            "foo",
            1,
            0.3333333333333333
        ],
        [
            "bar",
            1,
            0.3333333333333333
        ]
    ]
}
NumericProfiles for column: b: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "1",
            1,
            0.3333333333333333
        ],
        [
            "2",
            1,
            0.3333333333333333
        ],
        [
            "3",
            1,
           

#### constraint suggestion

In [None]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(suggestionResult)

{'constraint_suggestions': [{'constraint_name': 'CompletenessConstraint(Completeness(b,None))', 'column_name': 'b', 'current_value': 'Completeness: 1.0', 'description': "'b' is not null", 'suggesting_rule': 'CompleteIfCompleteRule()', 'rule_description': 'If a column is complete in the sample, we suggest a NOT NULL constraint', 'code_for_constraint': '.isComplete("b")'}, {'constraint_name': "ComplianceConstraint(Compliance('b' has no negative values,b >= 0,None))", 'column_name': 'b', 'current_value': 'Minimum: 1.0', 'description': "'b' has no negative values", 'suggesting_rule': 'NonNegativeNumbersRule()', 'rule_description': 'If we see only non-negative numbers in a column, we suggest a corresponding constraint', 'code_for_constraint': '.isNonNegative("b")'}, {'constraint_name': 'UniquenessConstraint(Uniqueness(List(b),None))', 'column_name': 'b', 'current_value': 'ApproxDistinctness: 1.0', 'description': "'b' is unique", 'suggesting_rule': 'UniqueIfApproximatelyUniqueRule()', 'rule_

#### constraint verification

In [None]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

Python Callback server started!
+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
+------------+-----------+------------+--------------------+-----------------+--------------------+



### other examples

Suggested data sources 
 
https://github.com/sdf94/deequ-examples/tree/master/data

https://www.kaggle.com/austinreese/craigslist-carstrucks-data

https://www.kaggle.com/austinreese/usa-housing-listings

http://insideairbnb.com/get-the-data.html


# Remarques : 

*   Au total j'ai dû importer 4 fichier csv. Et trois d'entre eux sont très volumineux. De ce fait, le téléchargement complet des fichiers peut planter à un certain moment et du coup je ne peux pas récupérer 100 % des données dans chacun de ces fichiers. Par conséquent, j'ai décidé de travailler qu'avec les données qui ont réussis à être téléchargées pour chacun des fichiers.




# Vehicles

   - Je commence par utiliser le dataset 'vehicles.csv' .
   - Je télécharge ce fichier depuis l'adresse : https://www.kaggle.com/austinreese/craigslist-carstrucks-data .
   - Il contient des informations concernant des ventes de voitures .
   - Le fichier initial a une taille de 1.45 GB .
   - J'importe le fichier 'vehicles.csv' à la main, et je le place dans le répertoire '/content/sample_data' .
   - La taille volumineuse du fichier, fait que les données prennent un certains temps à être téléchargées sur collab .


In [85]:
dir="/content/sample_data"
os.listdir(dir)

['listings.csv',
 'vehicles.csv',
 'housing.csv',
 'df_30.csv',
 '.ipynb_checkpoints']

In [86]:
vehicles = spark.read.option("header", "True").option("delimiter", ",").format("csv").load(dir + "/" + "vehicles.csv")
vehicles.show(3)
vehicles.printSchema()

+----------+--------------------+------------+--------------------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+---------+-----------+------+-----+----+----+------------+
|        id|                 url|      region|          region_url|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| VIN|drive|size|type|paint_color|image_url|description|county|state| lat|long|posting_date|
+----------+--------------------+------------+--------------------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+---------+-----------+------+-----+----+----+------------+
|7222695916|https://prescott....|    prescott|https://prescott....| 6000|null|        null| null|     null|     null|null|    null|        null|        null|null| null|null|null|       null|     null|       null|  null|   az|null|null|        null|
|721

- On dispose d'un ensemble de données dont les colonnes individuelles sont connues et ont des noms, mais aucune autre information de schéma telle que les types de données ne sont disponible.

     - Nous allons utiliser les fonctionnalités de la bibliothèque pyDeequ, afin d'identifier les types de données des colonnes.
     - Pour cela, nous  allons utiliser le profilage.
     - Le profilage de colonne unique s'effectue en trois passes sur les données. Dans la première passe, le système calcule la taille des données, exécute la détection du type de données sur chaque colonne. De plus, il calcule la complétude ainsi que le nombre approximatif de valeurs distinctes via des croquis hyperlogarithmiques pour chaque colonne d'intérêt.
     - Les tâches de profilage de la deuxième passe opèrent sur
les colonnes qui sont identifiées comme ayant des types numériques.  Pour chacune de ces colonnes,  des statistiques sommaires sont calculées, telles que le minimum, le maximum, la moyenne, l’´ecart standard et les quartiles approximatifs.
     - Dans un troisième temps, le système calcule la distribution de fréquence des valeurs pour les colonnes dont la cardinalité est inférieure à un seuil spécifié par l’utilisateur.



In [87]:
#Aanalyseurs

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(vehicles) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("id")) \
                    .run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+------------+--------+
| entity|instance|        name|   value|
+-------+--------+------------+--------+
|Dataset|       *|        Size|441802.0|
| Column|      id|Completeness|     1.0|
+-------+--------+------------+--------+



On constate que le dataset 'vehicles' contient 441 802 enregistrements, et que la colonne 'id' est complète , c'est à dire qu'elle contient aucune valeurs nulles; car la fraction des valeurs non-nulles dans la colonne 'id' est égale à 1.

In [88]:
#Profilage 
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(vehicles) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: drive: {
    "completeness": 0.6736592410174693,
    "approximateNumDistinctValues": 36,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 102,
        "Unknown": 144178,
        "String": 297522
    },
    "histogram": [
        [
            "2021-05-03T14:05:37-0700",
            1,
            2.263457385887796e-06
        ],
        [
            "2021-05-03T13:50:22-0700",
            1,
            2.263457385887796e-06
        ],
        [
            "rwd",
            58892,
            0.13329953236970407
        ],
        [
            "2021-05-03T15:20:21-0700",
            1,
            2.263457385887796e-06
        ],
        [
            " Adelsa",
            695,
            0.001573102883192018
        ],
        [
            "2021-05-03T12:15:32-0700",
            1,
            2.263457385887796e-06
        ],
        [
            "NullV

D'après ce profilage, on constate par exemple que :

   - le type détecté pour chaque colonnes est le type String ("dataType": "String"). De plus on constate que pour chaque colonnes, on a à chaque fois plusieurs types détectés dans le comptage, par exmple pour la colonne 'drive' , on a : "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 102,
        "Unknown": 144178,
        "String": 297522}

In [89]:
# suggestion de contraintes

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(vehicles) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
#print(suggestionResult)

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(drive,None))",
      "column_name": "drive",
      "current_value": "Completeness: 0.6736592410174693",
      "description": "'drive' has less than 32% missing values",
      "suggesting_rule": "RetainCompletenessRule()",
      "rule_description": "If a column is incomplete in the sample, we model its completeness as a binomial variable, estimate a confidence interval and use this to define a lower bound for the completeness",
      "code_for_constraint": ".hasCompleteness(\"drive\", lambda x: x >= 0.67, \"It should be above 0.67!\")"
    },
    {
      "constraint_name": "CompletenessConstraint(Completeness(image_url,None))",
      "column_name": "image_url",
      "current_value": "Completeness: 0.9689182031769887",
      "description": "'image_url' has less than 4% missing values",
      "suggesting_rule": "RetainCompletenessRule()",
      "rule_description": "If a column is incomplet

Le système recommande des contraintes pour l'ensemble de données, sur la base d’heuristique qui exploite les résultats du profilage.
   - On remarque que la colonne 'drive' est imcomplète dans l'échantillion de données. Elle dispose de moins de  32 % de valeurs nulles. De ce fait , la limite inférieur pour la complétude de cette colonne est de 0.67 . Le système suggère ainsi une contrainte 'hasCompleteness'.
   - C'est très utile dans notre cas d'utiliser la méthode de suggestion de contraintes car nous avons un jeu de données qui comporte de nombreuses colonnes, et du coup il peut être difficile pour nous de définir manuellement les contraintes appropriées. Deequ peut suggérer automatiquement des contraintes utiles en fonction de la distribution des données. 

In [91]:
# verification de contraintes:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(vehicles) \
    .addCheck(
        check.hasCompleteness("drive", lambda x: x >= 0.67, "It should be above 0.67!") \
        .hasCompleteness("image_url", lambda x: x >= 0.96, "It should be above 0.96!") \
        .hasCompleteness("posting_date", lambda x: x >= 0.94, "It should be above 0.94!") \
        .hasCompleteness("size", lambda x: x >= 0.27, "It should be above 0.27!") \
        .hasCompleteness("model", lambda x: x >= 0.95, "It should be above 0.95!") \
        .hasCompleteness("state", lambda x: x >= 0.94, "It should be above 0.94!") \
        .hasCompleteness("odometer", lambda x: x >= 0.95, "It should be above 0.95!") \
        .hasCompleteness("url", lambda x: x >= 0.97, "It should be above 0.97!") \
        .hasCompleteness("VIN", lambda x: x >= 0.6, "It should be above 0.6!") \
        .hasCompleteness("description", lambda x: x >= 0.96, "It should be above 0.96!") \
        .hasCompleteness("cylinders", lambda x: x >= 0.56, "It should be above 0.56!") \
        .hasCompleteness("price", lambda x: x >= 0.98, "It should be above 0.98!") \
        .hasCompleteness("year", lambda x: x >= 0.98, "It should be above 0.98!") \
        .hasCompleteness("fuel", lambda x: x >= 0.96, "It should be above 0.96!") \
        .isContainedIn("title_status", ["clean", "rebuilt", "salvage", "lien", "missing", " inexpensive cars", "parts only", " los ingresos en efectivo", " 528", " quality", " 325", " 150 ", " G3500 ", "-116.56152", " finance charges (if any]", "or", "40.753975", " Physical Damage of Liability Insurance", "47.209432"]) \
        .isComplete("id") \
        .hasCompleteness("long", lambda x: x >= 0.94, "It should be above 0.94!") \
        .hasCompleteness("lat", lambda x: x >= 0.94, "It should be above 0.94!") \
        .isUnique("id")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(30)

<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
<pydeequ.checks.Check object at 0x7f9a8a941610>
+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------

On peut également afficher les vérification dans un pandas Dataframe.
Cela nous permet de mieux voir le nom des contraintes ( qui est parfois trop long).

In [92]:
print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df



Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(drive,None))",Success,
1,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(image_url,...",Success,
2,Review Check,Warning,Warning,CompletenessConstraint(Completeness(posting_da...,Success,
3,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(size,None))",Success,
4,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(model,None))",Success,
5,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(state,None))",Success,
6,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(odometer,N...",Success,
7,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(url,None))",Success,
8,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(VIN,None))",Success,
9,Review Check,Warning,Warning,CompletenessConstraint(Completeness(descriptio...,Success,


J'ai utilisé les contraintes suggérées par le système comme base
pour concevoir des contrôles déclaratifs pour l’ensemble de données.
Le système a suggéré pès d'une trentaine de contraintes.
On utilise ces suggestions de contraintes pour enrichir la vérification de la qualité de nos données.
On remarque que l'on a un échec sur la contrainte 'isUnique("id")', car le ratio de valeur unique est égale à 0.97 et non à 1.

Nous pouvons aussi regarder toutes les métriques que Deequ a calculé pour cette vérification

In [93]:
checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df

Unnamed: 0,entity,instance,name,value
0,Column,url,Completeness,0.977628
1,Column,size,Completeness,0.2757
2,Column,description,Completeness,0.968916
3,Column,posting_date,Completeness,0.949124
4,Column,image_url,Completeness,0.968918
5,Column,cylinders,Completeness,0.568137
6,Column,long,Completeness,0.944059
7,Column,state,Completeness,0.947766
8,Column,price,Completeness,0.98541
9,Column,id,Uniqueness,0.977089


PyDeequ nous permet de faire persister les métriques que nous avons calculé sur les dataframes dans ce que l'on appelle un MetricsRepository. Dans l'exemple suivant, nous montrons comment stocker les métriques dans un système de fichiers et les interroger ultérieurement.

In [94]:
# Initialiser le dépôt de métriques

from pydeequ.repository import *

metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
print(f'metrics_file path: {metrics_file}')
repository = FileSystemMetricsRepository(spark, metrics_file)

metrics_file path: /tmp/1667583183152-0/metrics.json


Metrics Repository nous permet de stocker les métriques au format json sur le disque local 

Chaque ensemble de mesures que nous avons calculé doit être indexé par une clé de résultat, qui contient un horodatage et supporte des balises arbitraires sous forme de paires clé-valeur. 

In [95]:
key_tags = {'tag': 'vehicles'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

In [96]:
#Aanalyseurs

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(vehicles) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("id")) \
                    .addAnalyzer(Completeness("price")) \
                    .addAnalyzer(ApproxCountDistinct("year")) \
                    .useRepository(repository) \
                    .saveOrAppendResult(resultKey) \
                    .run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+-------------------+------------------+
| entity|instance|               name|             value|
+-------+--------+-------------------+------------------+
|Dataset|       *|               Size|          441802.0|
| Column|      id|       Completeness|               1.0|
| Column|   price|       Completeness|0.9854097536905673|
| Column|    year|ApproxCountDistinct|            5127.0|
+-------+--------+-------------------+------------------+



Nous pouvons par exemple charger directement la métrique pour un analyseur particulier stocké sous notre clé de résultat comme suit :

In [97]:
analysisResult_metRep = repository.load() \
                            .before(ResultKey.current_milli_time()) \
                            .getSuccessMetricsAsDataFrame()

analysisResult_metRep.show()

+-------+--------+-------------------+------------------+-------------+--------+
| entity|instance|               name|             value| dataset_date|     tag|
+-------+--------+-------------------+------------------+-------------+--------+
|Dataset|       *|               Size|          441802.0|1667583189590|vehicles|
| Column|      id|       Completeness|               1.0|1667583189590|vehicles|
| Column|   price|       Completeness|0.9854097536905673|1667583189590|vehicles|
| Column|    year|ApproxCountDistinct|            5127.0|1667583189590|vehicles|
+-------+--------+-------------------+------------------+-------------+--------+



# Housing

   - J'utilise le dataset 'housing.csv' .
   - Je télécharge ce fichier depuis l'adresse : https://www.kaggle.com/austinreese/usa-housing-listings .
   - Il contient des informations concernant des logments ( plus particulièrement le loyer par mois des logements) .
   - Le fichier initial a une taille de 558 MB .
   - J'importe le fichier 'housing.csv' à la main, et je le place dans le répertoire '/content/sample_data' .


In [53]:
os.listdir(dir)

['listings.csv',
 'vehicles.csv',
 'housing.csv',
 'df_30.csv',
 '.ipynb_checkpoints']

In [54]:
housing = spark.read.option("header", "True").option("delimiter", ",").format("csv").load(dir + "/" + "housing.csv")
housing.show(3)
housing.printSchema()

+----------+--------------------+------------+--------------------+-----+---------+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+--------------------+--------------------+-------+--------+-----+
|        id|                 url|      region|          region_url|price|     type|sqfeet|beds|baths|cats_allowed|dogs_allowed|smoking_allowed|wheelchair_access|electric_vehicle_charge|comes_furnished|laundry_options|parking_options|           image_url|         description|    lat|    long|state|
+----------+--------------------+------------+--------------------+-----+---------+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+--------------------+--------------------+-------+--------+-----+
|7049044568|https://reno.crai...|reno / tahoe|https://reno.crai...| 1148|apartment|  1078|   3|    2|  

In [55]:
#Aanalyseurs

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(housing) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("id")) \
                    .run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+------------+--------+
| entity|instance|        name|   value|
+-------+--------+------------+--------+
|Dataset|       *|        Size|325328.0|
| Column|      id|Completeness|     1.0|
+-------+--------+------------+--------+



On constate que le dataset 'housing' contient 325 328 enregistrements, et que la colonne 'id' est complète , c'est à dire qu'elle contient aucune valeurs nulles; car la fraction des valeurs non-nulles dans la colonne 'id' est égale à 1.

In [79]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(housing) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: image_url: {
    "completeness": 0.9999938523582353,
    "approximateNumDistinctValues": 156627,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 2,
        "String": 325326
    },
    "histogram": null
}
StandardProfiles for column: laundry_options: {
    "completeness": 0.7837905129592289,
    "approximateNumDistinctValues": 5,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 70339,
        "String": 254989
    },
    "histogram": [
        [
            "laundry in bldg",
            31314,
            0.09625362710864113
        ],
        [
            "no laundry on site",
            3226,
            0.009916146166330596
        ],
        [
            "NullValue",
            70339,
            0.21620948704077117

On remarque que pour la colonne 'electric_vehicle_charge', le type suggéré est le type 'Integral'.
Pour presque l'ensemble des colonnes, le type suggéré est le type 'String'.

In [70]:
# suggestion de contraintes

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(housing) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
#print(suggestionResult)

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(image_url,None))",
      "column_name": "image_url",
      "current_value": "Completeness: 0.9999938523582353",
      "description": "'image_url' has less than 1% missing values",
      "suggesting_rule": "RetainCompletenessRule()",
      "rule_description": "If a column is incomplete in the sample, we model its completeness as a binomial variable, estimate a confidence interval and use this to define a lower bound for the completeness",
      "code_for_constraint": ".hasCompleteness(\"image_url\", lambda x: x >= 0.99, \"It should be above 0.99!\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('laundry_options' has value range 'w/d in unit', 'w/d hookups', 'laundry on site', 'laundry in bldg', 'no laundry on site',`laundry_options` IN ('w/d in unit', 'w/d hookups', 'laundry on site', 'laundry in bldg', 'no laundry on site'),None))",
      "column_name": "laundry

le système suggère des contraintes pour la complétude de certaines colonnes. 
On vérfiera si pour une colonne , le ratio de valeur non-nulles est supérieur ou égale à un ratio que l'on précisera ( en utilisant la contrainte hasCompleteness).

In [80]:
# verification de contraintes:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(housing) \
    .addCheck(
        check.hasCompleteness("image_url", lambda x: x >= 0.99, "It should be above 0.99!") \
        .isContainedIn("laundry_options", ["w/d in unit", "w/d hookups", "laundry on site", "laundry in bldg", "no laundry on site"]) \
        #.isContainedIn("laundry_options", ["w/d in unit", "w/d hookups", "laundry on site", "laundry in bldg"], lambda x: x >= 0.98, "It should be above 0.98!") \
        .hasCompleteness("laundry_options", lambda x: x >= 0.78, "It should be above 0.78!") \
        .hasCompleteness("state", lambda x: x >= 0.99, "It should be above 0.99!") \
        .isContainedIn("electric_vehicle_charge", ["0", "1"]) \
        #.isContainedIn("electric_vehicle_charge", ["0"], lambda x: x >= 0.98, "It should be above 0.98!") \
        .isNonNegative("electric_vehicle_charge") \
        .hasCompleteness("electric_vehicle_charge", lambda x: x >= 0.99, "It should be above 0.99!") \
        .hasDataType("electric_vehicle_charge", ConstrainableDataTypes.Integral) \
        .isComplete("url") \
        .isUnique("url")\
        .hasCompleteness("sqfeet", lambda x: x >= 0.99, "It should be above 0.99!") \
        .hasCompleteness("description", lambda x: x >= 0.99, "It should be above 0.99!") \
        .isComplete("price") \
        .hasCompleteness("dogs_allowed", lambda x: x >= 0.99, "It should be above 0.99!") \
        #.isContainedIn("wheelchair_access", ["0"], lambda x: x >= 0.92, "It should be above 0.92!") \
        .hasCompleteness("wheelchair_access", lambda x: x >= 0.99, "It should be above 0.99!") \
        .isComplete("region_url") \
        .hasCompleteness("long", lambda x: x >= 0.99, "It should be above 0.99!") \
        .isComplete("id") \
        .isUnique("id") \
        #.isContainedIn("smoking_allowed", ["1", "0"], lambda x: x >= 0.99, "It should be above 0.99!") \
        .hasCompleteness("smoking_allowed", lambda x: x >= 0.99, "It should be above 0.99!") \
        .isComplete("region") \
        #.isContainedIn("baths", ["1", "2", "1.5"], lambda x: x >= 0.93, "It should be above 0.93!") \
        .hasCompleteness("baths", lambda x: x >= 0.99, "It should be above 0.99!") \
        #.isContainedIn("comes_furnished", ["0"], lambda x: x >= 0.95, "It should be above 0.95!") \
        .hasCompleteness("comes_furnished", lambda x: x >= 0.99, "It should be above 0.99!") \
        .hasCompleteness("cats_allowed", lambda x: x >= 0.99, "It should be above 0.99!") \
        #.isContainedIn("parking_options", ["off-street parking", "attached garage", "carport"], lambda x: x >= 0.9, "It should be above 0.9!") \
        .hasCompleteness("parking_options", lambda x: x >= 0.62, "It should be above 0.62!") \
        #.isContainedIn("type", ["apartment", "house"], lambda x: x >= 0.91, "It should be above 0.91!") \
        .hasCompleteness("type", lambda x: x >= 0.99, "It should be above 0.99!") \
        #.isContainedIn("beds", ["2", "1", "3"], lambda x: x >= 0.93, "It should be above 0.93!") \
        .hasCompleteness("beds", lambda x: x >= 0.99, "It should be above 0.99!")  \
        .hasMin("electric_vehicle_charge", lambda x: x == 0) )\
    .run()
    

<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>
<pydeequ.checks.Check object at 0x7f9a8973bad0>


In [81]:
# Affichage de la vérification de données

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df



Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(image_url,...",Success,
1,Review Check,Warning,Warning,ComplianceConstraint(Compliance(laundry_option...,Success,
2,Review Check,Warning,Warning,CompletenessConstraint(Completeness(laundry_op...,Success,
3,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(state,None))",Success,
4,Review Check,Warning,Warning,ComplianceConstraint(Compliance(electric_vehic...,Success,
5,Review Check,Warning,Warning,ComplianceConstraint(Compliance(electric_vehic...,Success,
6,Review Check,Warning,Warning,CompletenessConstraint(Completeness(electric_v...,Success,
7,Review Check,Warning,Warning,AnalysisBasedConstraint(DataType(electric_vehi...,Success,
8,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(url,None))",Success,
9,Review Check,Warning,Warning,"UniquenessConstraint(Uniqueness(List(url),None))",Success,


On constate que toutes les contraintes ont réussis, sauf celle sur le minimum de la colonne "electric_vehicle_charge". De ce fait on sait par exemple que pour la colonne 'electric_vehicle_charge' , le minimum n'est pas 0, et on sait qu'il n'y a aucune valeur négative dans cette colonne. 
On sait de plus que les valeurs des colonnes 'id' et 'url' sont uniques.

# Listing

   - J'utilise le dataset 'listings.csv' .
   - Je télécharge ce fichier depuis l'adresse : http://insideairbnb.com/get-the-data.html .
   - Il contient des informations concernant des logement airbnb pour la ville d'Amsterdam.
   - Le fichier  a une taille d'environ 20 MB .
   - J'importe le fichier 'listing.csv' à la main, et je le place dans le répertoire '/content/sample_data' .

In [82]:
os.listdir(dir)

['listings.csv',
 'vehicles.csv',
 'housing.csv',
 'df_30.csv',
 '.ipynb_checkpoints']

In [83]:
listing = spark.read.option("header", "True").option("delimiter", ",").format("csv").load(dir + "/" + "listings.csv")
listing.show(3)
listing.printSchema()

+--------------------+--------------------+--------------+------------+-----------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+------------------+--------------------+-------------------------+------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+-----------------+---------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------+----------------------+----------------------+----------------------+------

In [84]:
#Aanalyseurs

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(listing) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("id")) \
                    .run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+------------+------------------+
| entity|instance|        name|             value|
+-------+--------+------------+------------------+
|Dataset|       *|        Size|           11905.0|
| Column|      id|Completeness|0.9999160016799664|
+-------+--------+------------+------------------+



On constate que le dataset 'listing' contient 11 905 enregistrements, et que la colonne 'id' n'est pas totalement complète car le ratio des valeurs non-nulles dans la colonne 'id' n'est pas égale à 1.

In [None]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(listing) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: bedrooms: {
    "completeness": 0.5531289374212516,
    "approximateNumDistinctValues": 309,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 63,
        "Integral": 4727,
        "Unknown": 5320,
        "String": 1795
    },
    "histogram": null
}
StandardProfiles for column: instant_bookable: {
    "completeness": 0.5360772784544309,
    "approximateNumDistinctValues": 811,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 916,
        "Integral": 2294,
        "Unknown": 5523,
        "String": 3172
    },
    "histogram": null
}
StandardProfiles for column: minimum_minimum_nights: {
    "completeness": 0.5745485090298194,
    "approximateNumDistinctValues": 408,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 86,
        "Integral": 

1.   D'après le profilage on constate que pour chaque colone du dataset 'listing', le type détecté est le type string. De plus on remarque que pour chaque colonne, lorsque le système compte le nombre d'enregistrements pour chaque type, on se retrouve avec plusieurs type dans une même colonne. Par exemple pour la colonne 'bedrooms' on a : "typeCounts": {
        "Boolean": 0,
        "Fractional": 63,
        "Integral": 4727,
        "Unknown": 5320,
        "String": 1795 }.
Ce qui signifie que dans cette colonne on retrouve des enregistrements de type "int" , d'autre de type 'string', d'autre de type 'fraction", et certains sont de type inconnu.
2.   On remarque aussi qu'il y a beaucoup de valeurs manquantes sur les colonnes, car la limite inférieur pour la métrique completeness est souvent au alentours de 0.5.



In [None]:
# suggestion de contraintes

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(listing) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
#print(suggestionResult)

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(bedrooms,None))",
      "column_name": "bedrooms",
      "current_value": "Completeness: 0.5531289374212516",
      "description": "'bedrooms' has less than 46% missing values",
      "suggesting_rule": "RetainCompletenessRule()",
      "rule_description": "If a column is incomplete in the sample, we model its completeness as a binomial variable, estimate a confidence interval and use this to define a lower bound for the completeness",
      "code_for_constraint": ".hasCompleteness(\"bedrooms\", lambda x: x >= 0.54, \"It should be above 0.54!\")"
    },
    {
      "constraint_name": "CompletenessConstraint(Completeness(instant_bookable,None))",
      "column_name": "instant_bookable",
      "current_value": "Completeness: 0.5360772784544309",
      "description": "'instant_bookable' has less than 48% missing values",
      "suggesting_rule": "RetainCompletenessRule()",
      "rule_descr

D'après la suggestion de contrainte recommandée par le système, on remarque que beaucoup de contrainte de complétude sont suggérées.
C'est à dire qu'on vérifira , pour une colonne, si le ratio de valeurs non-nulles dans cette colonne est supérieur ou égale à un ratio qu'on s'est fixé pour cette colonne.

In [65]:
# verification de contraintes:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(listing) \
    .addCheck(
        check.hasCompleteness("bedrooms", lambda x: x >= 0.54, "It should be above 0.54!") \
        .hasCompleteness("instant_bookable", lambda x: x >= 0.52, "It should be above 0.52!") \
        .hasCompleteness("minimum_minimum_nights", lambda x: x >= 0.56, "It should be above 0.56!") \
        .hasCompleteness("host_verifications", lambda x: x >= 0.55, "It should be above 0.55!") \
        .hasCompleteness("name", lambda x: x >= 0.73, "It should be above 0.73!") \
        .hasCompleteness("room_type", lambda x: x >= 0.56, "It should be above 0.56!") \
        .hasCompleteness("source", lambda x: x >= 0.73, "It should be above 0.73!") \
        .hasCompleteness("availability_30", lambda x: x >= 0.56, "It should be above 0.56!") \
        .hasCompleteness("listing_url", lambda x: x >= 0.86, "It should be above 0.86!") \
        .hasCompleteness("number_of_reviews_l30d", lambda x: x >= 0.55, "It should be above 0.55!") ) \
    .run()
    

<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>
<pydeequ.checks.Check object at 0x7f9a8a9ab950>


In [66]:
# Affichage de la vérification de données

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df

Verification Run Status: Success


Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Success,"CompletenessConstraint(Completeness(bedrooms,N...",Success,
1,Review Check,Warning,Success,CompletenessConstraint(Completeness(instant_bo...,Success,
2,Review Check,Warning,Success,CompletenessConstraint(Completeness(minimum_mi...,Success,
3,Review Check,Warning,Success,CompletenessConstraint(Completeness(host_verif...,Success,
4,Review Check,Warning,Success,"CompletenessConstraint(Completeness(name,None))",Success,
5,Review Check,Warning,Success,"CompletenessConstraint(Completeness(room_type,...",Success,
6,Review Check,Warning,Success,"CompletenessConstraint(Completeness(source,None))",Success,
7,Review Check,Warning,Success,CompletenessConstraint(Completeness(availabili...,Success,
8,Review Check,Warning,Success,CompletenessConstraint(Completeness(listing_ur...,Success,
9,Review Check,Warning,Success,CompletenessConstraint(Completeness(number_of_...,Success,


In [67]:
# Affichage des métriques pour cette vérification

checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df


Unnamed: 0,entity,instance,name,value
0,Column,instant_bookable,Completeness,0.536077
1,Column,host_verifications,Completeness,0.568669
2,Column,bedrooms,Completeness,0.553129
3,Column,listing_url,Completeness,0.870307
4,Column,availability_30,Completeness,0.573373
5,Column,number_of_reviews_l30d,Completeness,0.563377
6,Column,room_type,Completeness,0.574381
7,Column,source,Completeness,0.747417
8,Column,name,Completeness,0.744393
9,Column,minimum_minimum_nights,Completeness,0.574549


On constate que les contraintes de complétudes mises en place ont réussis. On peut voir par exemple que la colonne 'bedrooms' a un ratio de valeur non-null égale à 0.55 .

# DF_30

   - J'utilise le dataset 'df_30.csv' .
   - Je télécharge ce fichier depuis l'adresse : https://github.com/sdf94/deequ-examples/tree/master/data .
   - Le fichier  a une taille d'environ 505 MB .
   - J'importe le fichier 'df_30.csv' à la main, et je le place dans le répertoire '/content/sample_data' .

In [58]:
os.listdir(dir)

['listings.csv',
 'vehicles.csv',
 'housing.csv',
 'df_30.csv',
 '.ipynb_checkpoints']

In [68]:
df_30 = spark.read.option("header", "True").option("delimiter", ",").format("csv").load(dir + "/" + "df_30.csv")
df_30.printSchema()

root
 |-- year: string (nullable = true)
 |-- SAMPLE: string (nullable = true)
 |-- SERIAL: string (nullable = true)
 |-- CBSERIAL: string (nullable = true)
 |-- NUMPREC: string (nullable = true)
 |-- HHWT: string (nullable = true)
 |-- HHTYPE: string (nullable = true)
 |-- CLUSTER: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- STATEFIP: string (nullable = true)
 |-- COUNTYFIP: string (nullable = true)
 |-- DENSITY: string (nullable = true)
 |-- CITY12: string (nullable = true)
 |-- CITYPOP: string (nullable = true)
 |-- STRATA: string (nullable = true)
 |-- CNTRY: string (nullable = true)
 |-- GQ: string (nullable = true)
 |-- OWNERSHP: string (nullable = true)
 |-- OWNERSHPD: string (nullable = true)
 |-- MORTGAGE: string (nullable = true)
 |-- TAXINCL: string (nullable = true)
 |-- INSINCL: string (nullable = true)
 |-- RENT: string (nullable = true)
 |-- RENTGRS: string (nullable = true)
 |-- HHINCOME: string (nullable = true)
 |-- VALUEH: string (nullable = t

In [69]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df_30) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("AGE")) \
                    .addAnalyzer(ApproxCountDistinct("AGE")) \
                    .addAnalyzer(Mean("AGE")) \
                    .addAnalyzer(Compliance("AGE", "AGE > 0")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+-------------------+--------+
| entity|instance|               name|   value|
+-------+--------+-------------------+--------+
| Column|     AGE|         Compliance|     1.0|
|Dataset|       *|               Size|435761.0|
| Column|     AGE|       Completeness|     1.0|
| Column|     AGE|ApproxCountDistinct|    85.0|
+-------+--------+-------------------+--------+



On constate que le dataset 'df_30' contient 435 761 enregistrements. De plus, nous pouvons voir qu'il y a approximativement 85 valeurs d'âge distinctes; que tous les âges sont supérieur à 0 et qu'il n'y aucun âge manquant.

In [61]:
# suggestion de contraintes

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df_30) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
#print(suggestionResult)

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(CITYPOP,None))",
      "column_name": "CITYPOP",
      "current_value": "Completeness: 1.0",
      "description": "'CITYPOP' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"CITYPOP\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('CITYPOP' has no negative values,CITYPOP >= 0,None))",
      "column_name": "CITYPOP",
      "current_value": "Minimum: 0.0",
      "description": "'CITYPOP' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constraint": ".isNonNegative(\"CITYPOP\")"
    },
    {
      "constraint_name": "AnalysisBasedConstraint(DataTy

In [62]:
#Profilage 
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df_30) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

NumericProfiles for column: CITYPOP: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 754,
    "dataType": "Fractional",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 435761,
        "Integral": 0,
        "Unknown": 0,
        "String": 0
    },
    "histogram": null,
    "kll": "None",
    "mean": 4322.078295212284,
    "maximum": 86227.0,
    "minimum": 0.0,
    "sum": 1883393160.0,
    "stdDev": 16303.881597615262,
    "approxPercentiles": []
}
NumericProfiles for column: DIVINYR: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 4,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 435761,
        "Unknown": 0,
        "String": 0
    },
    "histogram": [
        [
            "8",
            103,
            0.00023636810086262883
        ],
        [
            "0",
            169917,
            0.3899

Cette fois-ci, sur le dataset 'df_30', on remarque qu'il y a des colonnes de type 'integral' , et d'autre de types 'fractional'. Ce qui permet au système de calculer des métriques telles que la moyenne, le maximum, le minimum, etc. sur ces données.
On remarque par exemple que pour la colonne "AGE', on a :{ "mean": 41.707312035726005,
    "maximum": 96.0,
    "minimum": 16.0,
    "sum": 18174420.0,
    "stdDev": 16.334429111369666 }
On a aussi un histogramme de la distrubution des valeurs d'âge qui est calculé.

In [63]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df_30) \
    .addCheck(
        check.hasDataType("AGE", ConstrainableDataTypes.Integral) \
        .hasMin("AGE", lambda x: x == 0) \
        .isNonNegative("AGE") \
        .isComplete("AGE") \
     ) \
    .run()


In [64]:
# Affichage de la vérification de données

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df



Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Warning,"AnalysisBasedConstraint(DataType(AGE,None),<fu...",Success,
1,Review Check,Warning,Warning,"MinimumConstraint(Minimum(AGE,None))",Failure,Expected type of column AGE to be one of (Long...
2,Review Check,Warning,Warning,ComplianceConstraint(Compliance(AGE is non-neg...,Success,
3,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(AGE,None))",Success,


D'après la vérification des contrôles de données, on remarque la colonne 'AGE' est de type 'integral', que cette colonne est complète ( il n'y a pas de valeurs manquantes), qu'il y a aucun âge négatif. Mais que le minimum de la colonne 'AGE' n'est pas 0.