In [1]:
import os

# Prepara ambiente

### Para este teste estou usando a versão spark-3.1.2-bin-hadoop3.2

In [2]:
os.environ['SPARK_HOME'] = '/c/spark-3.1.2-bin-hadoop3.2'

### instala o pydeequ 

https://github.com/awslabs/python-deequ

In [3]:
!pip install pydeequ
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [4]:
from pyspark.sql import SparkSession, Row

import pydeequ

spark = (SparkSession
            .builder
            .config("spark.jars.packages", pydeequ.deequ_maven_coord)
            .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
            .getOrCreate())

Please set env variable SPARK_VERSION


### Executando um teste para verificar se a sessão spark esta correta

In [5]:
df_test = spark.sparkContext.parallelize([
    Row(coluna1="Banana",  valor=1.50, quantidade=5),
    Row(coluna1="Maça",    valor=1.85, quantidade=6),
    Row(coluna1="Laranja", valor=3.00, quantidade=None)]).toDF()


df_test.toPandas()

Unnamed: 0,coluna1,valor,quantidade
0,Banana,1.5,5.0
1,Maça,1.85,6.0
2,Laranja,3.0,


### Agora que sabemos que nossa sessão spark esta correta, vamos importar um fonte de dados e criar nosso dataframe

## Importando dados 

Para este exemplo vou utilizar um parte dos dados de empresas brasileiras.
Estes dados são disponibilizados em .zip pelo governo federal

https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj

São vários zips, para facilitar salver um dos arquivos sem ./fonte_de_dados

In [6]:
from pyspark.sql.types import StructType,StructField, StringType

schema = StructType([ \
    StructField("cnpj_basico",StringType(),True), \
    StructField("razao_social",StringType(),True), \
    StructField("natureza_juridica",StringType(),True), \
    StructField("qualificacao_do_responsavel", StringType(), True), \
    StructField("capital_social", StringType(), True), \
    StructField("porte_da_empresa", StringType(), True),
    StructField("ente_federativo_responsavel", StringType(), True),
  ])


#detalhes sobre o layout em https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/arquivos/novolayoutdosdadosabertosdocnpj-dez2021.pdf

In [7]:
df_dados_de_empresa = spark.read.csv('./fonte_de_dados/',sep=';',schema=schema)
df_dados_de_empresa.show()


+-----------+--------------------+-----------------+---------------------------+--------------+----------------+---------------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_do_responsavel|capital_social|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------+-----------------+---------------------------+--------------+----------------+---------------------------+
|   00000000|  BANCO DO BRASIL SA|             2038|                         10|90000000000,00|              05|                       null|
|   00000001|ASSOCIACAO DE AMI...|             3999|                         16|          0,00|              05|                       null|
|   00000002|WM&R EMPREITEIRA ...|             2240|                         49|          0,00|              05|                       null|
|   00000003|CASA CARIDADE LUZ...|             3999|                         16|          0,00|              05|                       null|
|   00000004|

### Analisando o dataframe

In [11]:
df_dados_de_empresa.describe().show()

+-------+------------------+--------------------+-----------------+---------------------------+------------------+------------------+---------------------------+
|summary|       cnpj_basico|        razao_social|natureza_juridica|qualificacao_do_responsavel|    capital_social|  porte_da_empresa|ente_federativo_responsavel|
+-------+------------------+--------------------+-----------------+---------------------------+------------------+------------------+---------------------------+
|  count|           4494860|             4494860|          4494860|                    4494860|           4494860|           4494854|                       8019|
|   mean|2322903.3175302455|       3.021173095E8|2273.042814236706|          45.90194967357123|               3.4|3.0009984751451326|                       null|
| stddev|1333429.6524474057|4.2718016364251304E8| 524.648347834671|         10.781986313474368|2.1908902300206643|1.9679811195391674|                       null|
|    min|          00000000|

#### Vamos criar uma função para validar o CNPJ

In [60]:
from itertools import cycle

LENGTH_CNPJ = 14

def is_cnpj_valido(cnpj: str) -> bool:
    if len(cnpj) != LENGTH_CNPJ:
        return False

    if cnpj in (c * LENGTH_CNPJ for c in "1234567890"):
        return False

    cnpj_r = cnpj[::-1]
    for i in range(2, 0, -1):
        cnpj_enum = zip(cycle(range(2, 10)), cnpj_r[i:])
        dv = sum(map(lambda x: int(x[1]) * x[0], cnpj_enum)) * 10 % 11
        if cnpj_r[i - 1:i] != str(dv % 10):
            return False

    return True

#### Agora vamos criar uma nova coluna que vai receber o resultado da validação do CNPJ

In [69]:
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import udf

is_cnpj_valido_lambda = udf(lambda x: is_cnpj_valido(x), StringType())
df_dados_de_empresa = df_dados_de_empresa.withColumn("cnpj_valido",is_cnpj_valido_lambda(col('cnpj_basico')))

+-----------+
|cnpj_valido|
+-----------+
|      false|
+-----------+



### Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:

In [25]:
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df_dados_de_empresa) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("cnpj_basico")) \
    .addAnalyzer(Completeness("razao_social")) \
    .addAnalyzer(Completeness("natureza_juridica")) \
    .addAnalyzer(Completeness("qualificacao_do_responsavel")) \
    .addAnalyzer(Completeness("capital_social")) \
    .addAnalyzer(Completeness("porte_da_empresa")) \
    .addAnalyzer(Completeness("ente_federativo_responsavel")) \
    .run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------------------+------------+--------------------+
| entity|            instance|        name|               value|
+-------+--------------------+------------+--------------------+
| Column|qualificacao_do_r...|Completeness|                 1.0|
| Column|    porte_da_empresa|Completeness|  0.9999986651419621|
| Column|        razao_social|Completeness|                 1.0|
| Column|   natureza_juridica|Completeness|                 1.0|
|Dataset|                   *|        Size|           4494860.0|
| Column|         cnpj_basico|Completeness|                 1.0|
| Column|      capital_social|Completeness|                 1.0|
| Column|ente_federativo_r...|Completeness|0.001784037767583...|
+-------+--------------------+------------+--------------------+



Com base em uma analise inicial do dataframe, já podemos observar que existem problemas nos campos de porte_da_empresa e ente_federativo_responsavel

In [28]:
print(f'Registros com ente_federativo_responsavel igual a null: {df_dados_de_empresa.filter(df_dados_de_empresa.ente_federativo_responsavel.isNull()).count()}')
print(f'Registros com porte_da_empresa igual a null: {df_dados_de_empresa.filter(df_dados_de_empresa.porte_da_empresa.isNull()).count()}')

Registros com ente_federativo_responsavel igual a null: 4486841
Registros com porte_da_empresa igual a null: 6


### Defining and running tests for data

```
After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.
```

### Algumas validações que vamos adicionar

```
CNPJ preenchido e unico

Razão Social preenchido

Código do porte da empresa precisa ser algum código válido:
    00  – NÃO INFORMADO 
    01  -  MICRO EMPRESA 
    03 - EMPRESA DE PEQUENO PORTE 
    05 - DEMAIS
    
Se o CNPJ é válido de acordo com a função que criamos    
```

In [73]:
from pydeequ.checks import *
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark)\
    .onData(df_dados_de_empresa)\
    .addCheck(check.hasSize(lambda x: x >= 3)\
        .isComplete("cnpj_basico")\
        .isComplete("razao_social")\
        .isUnique("cnpj_basico")\
        .isContainedIn("porte_da_empresa", ["00", "01", "03", "05"]) \ 
        .isContainedIn("cnpj_valido", ['true']))\
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.toPandas()

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Warning,SizeConstraint(Size(None)),Success,
1,Review Check,Warning,Warning,CompletenessConstraint(Completeness(cnpj_basic...,Success,
2,Review Check,Warning,Warning,CompletenessConstraint(Completeness(razao_soci...,Success,
3,Review Check,Warning,Warning,UniquenessConstraint(Uniqueness(List(cnpj_basi...,Success,
4,Review Check,Warning,Warning,ComplianceConstraint(Compliance(porte_da_empre...,Success,
5,Review Check,Warning,Warning,ComplianceConstraint(Compliance(cnpj_valido co...,Failure,Value: 0.0 does not meet the constraint requir...


## Conclusão

```
Conseguimos importar uma fonte de dados e fazer uma analise e validação inicial. Isso pode contribuir para a qualidade dos dados que estamos trabalhando.
```