# ST IT Cloud - Data and Analytics Test LV.3

Esse teste deve avaliar a qualidade técnica na manipulacão de dados.

## Passo a passo

- Disponibilizamos aqui 1 case para ser solucionado, leia os enunciados dos problemas, desenvolva os programas, utilizando a **stack definida durante o processo seletivo**, para entregar os dados de acordo com os requisitos descritos abaixo.


**Faz parte dos critérios de avaliacão a pontualidade da entrega. Implemente até onde for possível dentro do prazo acordado.**

**Os dados de pessoas foram gerados de forma aleatória, utilizando a biblioteca FakerJS, FakerJS-BR e Faker**

LEMBRE-SE: A entrega deve conter TODOS os passos para o avaliador executar o programa (keep it simple).


# TESTE PRÁTICO

**Problema**: Você está recebendo o arquivo 'dados_cadastrais_fake.csv' que contem dados cadastrais de clientes, mas para que análises ou relatórios sejam feitos é necessário limpar e normalizar os dados. Além disso, existe uma coluna com o número de cpf e outra com cnpj, você precisará padronizar deixando apenas dígitos em formato string (sem caracteres especiais), implementar uma forma de verificar se tais documentos são válidos sendo que a informação deve se adicionada ao dataframe em outras duas novas colunas.

Após a normalização, gere reports que respondam as seguintes perguntas:
- Quantos clientes temos nessa base?
- Qual a média de idade dos clientes?
- Quantos clientes nessa base pertencem a cada estado?
- Quantos CPFs válidos e inválidos foram encontrados?
- Quantos CNPJs válidos e inválidos foram encontrados?

Ao final gere um arquivo no formato csv e um outro arquivo no formato parquet chamado (problema1_normalizado), eles serão destinados para pessoas distintas.

In [272]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, udf
from pyspark.sql.functions import regexp_replace
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

In [273]:
spark = SparkSession.builder.appName("teste").getOrCreate()

In [274]:
## Função de validação do dígito do CPF
def valida_cpf(cpf):
	pesos_dv_1 = [10, 9, 8, 7, 6, 5, 4, 3, 2]
	pesos_dv_2 = [11, 10, 9, 8, 7, 6, 5, 4, 3, 2]

	cpf_len = len(cpf)
	cpf_sem_dv = cpf[:-2]
	cpf_origin_dv_1 = cpf[9]
	cpf_origin_dv_2 = cpf[10]


	### Validacao primeiro digito verificador
	soma_dv_1 = sum([peso * int(n) for peso, n in zip(pesos_dv_1, cpf_sem_dv)])
	resto_dv_1 = soma_dv_1 % 11
	dv_1 = 0 if resto_dv_1 in [0, 1] else 11 - resto_dv_1
	dv_1 = str(dv_1)

	### Validacao segundo digito verificador
	cpf_dv_1 = f"{cpf_sem_dv}{dv_1}"
	soma_dv_2 = sum([peso * int(n) for (peso, n) in zip(pesos_dv_2, cpf_dv_1)])
	resto_dv_2 = soma_dv_2 % 11
	dv_2 = 0 if resto_dv_2 in [0,1] else 11 - resto_dv_2
	dv_2 = str(dv_2)

	cpf_validado = f"{cpf_sem_dv}{dv_1}{dv_2}"
	# print(cpf_validado)

	if ((cpf_len == 11) and (dv_1 == cpf_origin_dv_1) and (dv_2 == cpf_origin_dv_2)):
            return "Válido"
	else:
            return "Inválido"

In [275]:
## Função de validação do dígito do CNPJ
def valida_cnpj(cnpj):
	pesos_dv_1 = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
	pesos_dv_2 = [6, 5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]

	cnpj_len = len(cnpj)
	cnpj_sem_dv = cnpj[:-2]
	cnpj_origin_dv_1 = cnpj[12]
	cnpj_origin_dv_2 = cnpj[13]


	### Validacao primeiro digito verificador
	soma_dv_1 = sum([peso * int(n) for peso, n in zip(pesos_dv_1, cnpj_sem_dv)])
	resto_dv_1 = soma_dv_1 % 11
	dv_1 = 0 if resto_dv_1 in [0, 1] else 11 - resto_dv_1
	dv_1 = str(dv_1)

	### Validacao segundo digito verificador
	cnpj_dv_1 = f"{cnpj_sem_dv}{dv_1}"
	
	soma_dv_2 = sum([peso * int(n) for (peso, n) in zip(pesos_dv_2, cnpj_dv_1)])
	resto_dv_2 = soma_dv_2 % 11
	dv_2 = 0 if resto_dv_2 in [0,1] else 11 - resto_dv_2
	dv_2 = str(dv_2)

	cnpj_validado = f"{cnpj_sem_dv}{dv_1}{dv_2}"

	if ((cnpj_len == 14) and (dv_1 == cnpj_origin_dv_1) and (dv_2 == cnpj_origin_dv_2)):
		return "Válido" 
	else:
		return "Inválido"

In [276]:
## Leitura do arquivo csv
df = spark.read.format("csv").options(header = 'true', inferSchema = 'true', sep = ';').load("dados_cadastrais_fake.csv")


In [277]:
## Remoção de pontos, vírgulas e barras dos cnpjs
df_trata_cnpj = df.withColumn("cnpj",regexp_replace(col("cnpj"), "[\\.,\\-,\\/]", ""))

In [278]:
## Remoção de pontos, vírgulas e barras dos cpfs
df_tratado = df_trata_cnpj.withColumn("cpf",regexp_replace(col("cpf"), "[\\.,\\-,\\/]", ""))

In [279]:
## Padronização dos estados
df_estado = df_tratado.withColumn('estado', regexp_replace('estado', 'são  paulo', 'SP'))
df_estado = df_estado.withColumn('estado', regexp_replace('estado', 'sao  paulo', 'SP'))
df_estado = df_estado.withColumn('estado', regexp_replace('estado', 'MINAS GERAIs', 'MG'))
df_estado = df_estado.withColumn('estado', regexp_replace('estado', 'MINAS GERAI', 'MG'))
df_estado = df_estado.withColumn('estado', regexp_replace('estado', 'distrito federal', 'DF'))
df_estado = df_estado.withColumn('estado', regexp_replace('estado', 'rio de  janeiro ', 'RJ'))

In [280]:
## Padronização dos estados e cidades - Primeira letra maiúscula
df_trata_nome = df_estado.withColumn("nomes", f.initcap(f.col("nomes")))
df_trata_cidade = df_trata_nome.withColumn("cidade", f.initcap(f.col("cidade")))

In [281]:
## Chamada a função que valida o cpf
call_valida_cpf = udf(lambda x: valida_cpf(x), StringType())
df_cpf_validado = df_trata_cidade.withColumn("StatusCPF", call_valida_cpf(df_trata_cidade.cpf))

In [282]:
## Chamada a função que valida o cnpj
call_valida_cnpj = udf(lambda x: valida_cnpj(x), StringType())
df_final = df_cpf_validado.withColumn("StatusCNPJ", call_valida_cnpj(df_cpf_validado.cnpj))

## Quantos clientes tem na base?
- Fiz o calculo baseado na quantidade de números de cpf.

In [286]:
df_final.select(countDistinct("cpf").alias("Qtde Clientes")).show()

+-------------+
|Qtde Clientes|
+-------------+
|        10000|
+-------------+



## Qual a media de idade dos clientes?

In [287]:
df_final.agg(f.avg(col('idade')).alias("Media idade")).show()

+-----------+
|Media idade|
+-----------+
|    53.7831|
+-----------+



## Quantos clientes nessa base pertencem a cada estado?

In [288]:
df_final.groupBy('estado').count().show(50)

+------+-----+
|estado|count|
+------+-----+
|    SC|  370|
|    RO|  370|
|    PI|  370|
|    AM|  371|
|    RR|  370|
|    GO|  371|
|    TO|  370|
|    MT|  370|
|    SP|  370|
|    ES|  371|
|    PB|  370|
|    RS|  370|
|    MS|  370|
|    AL|  371|
|    MG|  370|
|    PA|  370|
|    BA|  371|
|    SE|  370|
|    PE|  370|
|    CE|  371|
|    RN|  370|
|    RJ|  370|
|    MA|  371|
|    AC|  371|
|    DF|  371|
|    PR|  370|
|    AP|  371|
+------+-----+



## Quantos CPFs válidos e inválidos foram encontrados?

In [289]:
df_final.groupBy('StatusCPF').count().show()

+---------+-----+
|StatusCPF|count|
+---------+-----+
|   Válido|10000|
+---------+-----+



## Quantos CNPJs válidos e inválidos foram encontrados?

In [290]:
df_final.groupBy('StatusCNPJ').count().show()

+----------+-----+
|StatusCNPJ|count|
+----------+-----+
|    Válido|10000|
+----------+-----+



In [295]:
## Salvando o arquivo em formato csv
df_final.write.option("header",True).mode("overwrite").csv("clientes_csv")

In [296]:
## Salvando o arquivo em formato parquet
df_final.write.option("header",True).mode("overwrite").parquet("clientes_parquet")