In [1]:
import os

# Prepara ambiente

### Para este teste estou usando a versão spark-3.1.2-bin-hadoop3.2

In [2]:
os.environ['SPARK_HOME'] = '/c/spark-3.1.2-bin-hadoop3.2'

### instala o pydeequ 

https://github.com/awslabs/python-deequ

In [4]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession, Row
spark = (SparkSession
            .builder
            .getOrCreate())

22/01/06 16:58:07 WARN Utils: Your hostname, DBC-0001023 resolves to a loopback address: 127.0.1.1; using 172.18.208.1 instead (on interface eth3)
22/01/06 16:58:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/01/06 16:58:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Executando um teste para verificar se a sessão spark esta correta

In [4]:
df_test = spark.sparkContext.parallelize([
    Row(coluna1="Banana",  valor=1.50, quantidade=5),
    Row(coluna1="Maça",    valor=1.85, quantidade=6),
    Row(coluna1="Laranja", valor=3.00, quantidade=None)]).toDF()


df_test.toPandas()

                                                                                

Unnamed: 0,coluna1,valor,quantidade
0,Banana,1.5,5.0
1,Maça,1.85,6.0
2,Laranja,3.0,


### Agora que sabemos que nossa sessão spark esta correta, vamos importar um fonte de dados e criar nosso dataframe

## Importando dados 

Para este exemplo vou utilizar um parte dos dados de empresas brasileiras.
Estes dados são disponibilizados em .zip pelo governo federal

https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj

São vários zips, para facilitar salver um dos arquivos sem ./fonte_de_dados

In [7]:
from pyspark.sql.types import StructType,StructField, StringType

schema = StructType([ \
    StructField("cnpj_basico",StringType(),True), \
    StructField("razao_social",StringType(),True), \
    StructField("natureza_juridica",StringType(),True), \
    StructField("qualificacao_do_responsavel", StringType(), True), \
    StructField("capital_social", StringType(), True), \
    StructField("porte_da_empresa", StringType(), True),
    StructField("ente_federativo_responsavel", StringType(), True),
  ])


#detalhes sobre o layout em https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/arquivos/novolayoutdosdadosabertosdocnpj-dez2021.pdf

In [8]:
df_dados_de_empresa = spark.read.csv('./fonte_de_dados/',sep=';',schema=schema)
df_dados_de_empresa.show()




+---------------------+--------------------+--------------------+---------------------------+--------------+----------------+---------------------------+
|          cnpj_basico|        razao_social|   natureza_juridica|qualificacao_do_responsavel|capital_social|porte_da_empresa|ente_federativo_responsavel|
+---------------------+--------------------+--------------------+---------------------------+--------------+----------------+---------------------------+
| PK   �zSmU�...|                null|                null|                       null|          null|            null|                       null|
| ����Mr�|� H�x�...|                null|                null|                       null|          null|            null|                       null|
| ,y+3��::���4��...|                null|                null|                       null|          null|            null|                       null|
| ?���U�f�m�`Ɏ�tX�...|                null|                null|             



### Analisando o dataframe

In [9]:
df_dados_de_empresa.describe().show()



+-------+--------------------+--------------------+--------------------+---------------------------+--------------------+--------------------+---------------------------+
|summary|         cnpj_basico|        razao_social|   natureza_juridica|qualificacao_do_responsavel|      capital_social|    porte_da_empresa|ente_federativo_responsavel|
+-------+--------------------+--------------------+--------------------+---------------------------+--------------------+--------------------+---------------------------+
|  count|              530545|              185662|               65729|                      23732|                8604|                3164|                       1164|
|   mean|           21881.425|   5.261654135338346|   6.571428571428571|          7.769230769230769|   5.888888888888889|                 8.0|                        9.0|
| stddev|   391311.5359487437|   8.003453059740613|    9.33143237944212|         10.017932638971695|   2.666666666666667|  1.4142135623730951|   

#### Vamos criar uma função para validar o CNPJ

In [10]:
from itertools import cycle

LENGTH_CNPJ = 14

def is_cnpj_valido(cnpj: str) -> bool:
    if len(cnpj) != LENGTH_CNPJ:
        return False

    if cnpj in (c * LENGTH_CNPJ for c in "1234567890"):
        return False

    cnpj_r = cnpj[::-1]
    for i in range(2, 0, -1):
        cnpj_enum = zip(cycle(range(2, 10)), cnpj_r[i:])
        dv = sum(map(lambda x: int(x[1]) * x[0], cnpj_enum)) * 10 % 11
        if cnpj_r[i - 1:i] != str(dv % 10):
            return False

    return True

#### Agora vamos criar uma nova coluna que vai receber o resultado da validação do CNPJ

In [11]:
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import udf

is_cnpj_valido_lambda = udf(lambda x: is_cnpj_valido(x), StringType())
df_dados_de_empresa = df_dados_de_empresa.withColumn("cnpj_valido",is_cnpj_valido_lambda(col('cnpj_basico')))