# ST IT Cloud - Data and Analytics Test LV.4

Esse teste deve avaliar alguns conceitos de big data e a qualidade técnica na manipulacão de dados, otimização de performance, trabalho com arquivos grandes e tratamento de qualidade.

## Passo a passo

-Disponibilizamos aqui 2 cases para serem desenvolvidos, leia os enunciados dos problemas, desenvolver os programas, utilizando a **stack definida durante o processo seletivo**, para entregar os dados de acordo com os requisitos descritos abaixo.

**Faz parte dos critérios de avaliacão a pontualidade da entrega. Implemente até onde for possível dentro do prazo acordado.**

**Os dados de pessoas foram gerados de forma aleatória, utilizando a biblioteca FakerJS, FakerJS-BR e Faker**

LEMBRE-SE: A entrega deve conter TODOS os passos para o avaliador executar o programa (keep it simple).


# TESTE PRÁTICO

**Problema 1**: Você está recebendo o arquivo 'dados_cadastrais_fake.csv' que contem dados cadastrais de clientes, mas para que análises ou relatórios sejam feitos é necessário limpar e normalizar os dados. Além disso, existe uma coluna com o número de cpf e outra com cnpj, você precisará padronizar deixando apenas dígitos em formato string (sem caracteres especiais), implementar uma forma de verificar se tais documentos são válidos sendo que a informação deve se adicionada ao dataframe em outras duas novas colunas.

Após a normalização, gere reports que respondam as seguintes perguntas:
- Quantos clientes temos nessa base?
- Qual a média de idade dos clientes?
- Quantos clientes nessa base pertencem a cada estado?
- Quantos CPFs válidos e inválidos foram encontrados?
- Quantos CNPJs válidos e inválidos foram encontrados?

Ao final gere um arquivo no formato csv e um outro arquivo no formato parquet chamado (problema1_normalizado), eles serão destinados para pessoas distintas.


In [176]:
from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import initcap, regexp_replace, ltrim, rtrim, col, udf

In [151]:
# STARTS SPARK SESSION
spark = SparkSession.builder.appName("PROGRAMA1").getOrCreate()

In [152]:
# LOADS CSV
df = spark.read.option("header", True).option("delimiter", ";").option("inferSchema", True).csv("dados_cadastrais_fake.csv")
df.show()

+------------------+-----+-------------------+----------------+--------------+------------------+
|             nomes|idade|             cidade|          estado|           cpf|              cnpj|
+------------------+-----+-------------------+----------------+--------------+------------------+
|    Dennis Daniels|   31|         ACRELÂNDIA|              AC|   97566536800|    06589184909526|
|       Leah Becker|   42|        ÁGUA BRANCA|              AL|425.263.807-07|25.673.336/2350-20|
|        Sally Ford|   18|           ALVARÃES|              AM|   34647754103|    26543101702989|
|    Colleen Duncan|   21|     SERRA DO NAVIO|              AP|252.531.560-03|19.062.080/5100-98|
|   Jeff Stephenson|   73|             ABAÍRA|              BA|   49668886542|    97794530015384|
|     Sydney Curtis|   85|            ABAIARA|              CE|506.202.907-49|29.476.298/0856-78|
|    Kelly Matthews|   44|           Brasília|distrito federal|   39154836808|    24709301957761|
|         Juan Ruiz|

In [153]:
#IDENTIFIES DIRTY DATA FOR NAMES
df.filter(
    ~col("nomes").rlike("^([A-Z]([-']?[a-z]+))([ ]([A-Z]([-']?[a-z]+)[.]?)?([A-Z]+)?)*$")
).show(10, False)

+----------------------+-----+-------------------+------+--------------+------------------+
|nomes                 |idade|cidade             |estado|cpf           |cnpj              |
+----------------------+-----+-------------------+------+--------------+------------------+
|Mr. Andrew Chapman MD |55   |ABDON BATISTA      |SC    |645.976.852-86|26.848.890/5356-10|
|Mrs. Yesenia Barrett  |88   |ÁGUA BRANCA        |AL    |746.102.105-25|50.800.865/6979-77|
|Dr. Jennifer Williams |36   |SERRA DO NAVIO     |AP    |443.310.261-08|48.607.469/0454-82|
|Mr. Jesse Fletcher    |21   |ABADIA DOS DOURADOS|MG    |812.821.490-08|48.249.914/7290-01|
|Mr. Derrick Walker III|67   |SERRA DO NAVIO     |AP    |867.098.157-22|72.983.983/2334-06|
|Mr. Charles Schwartz  |47   |ÁGUA BRANCA        |PB    |01473369290   |78060778010570    |
|Mrs. Suzanne Peters   |19   |ACRELÂNDIA         |AC    |81433726181   |29592536264459    |
|Mrs. Angela Hernandez |34   |AFONSO CLÁUDIO     |ES    |967.654.794-81|78.156.6

In [154]:
# CLEAN NAMES

# Capitalize all names
df = df.withColumn("nomes", initcap(col("nomes")))

# Take out prefixes (Mr., Dr., Miss, (...))
df = df.withColumn("nomes", regexp_replace(col("nomes"), "^[A-Z][a-z]*[.][ ]", ""))
df = df.withColumn("nomes", regexp_replace(col("nomes"), "^Miss[ ]", ""))

In [155]:
#IDENTIFIES DIRTY DATA FOR AGES
df.filter(
    (col("idade").cast("int").isNull()) | (col("idade").cast("int") <= 0) | (col("idade").cast("int") > 100)
).show(10, False)

+-----+-----+------+------+---+----+
|nomes|idade|cidade|estado|cpf|cnpj|
+-----+-----+------+------+---+----+
+-----+-----+------+------+---+----+



In [156]:
# CLEAN AGES

In [157]:
#IDENTIFIES DIRTY DATA FOR CITIES
df.filter(
    ~col("cidade").rlike("^[\\p{L}]+([ ]([A-Z]['])?[\\p{L}]+)*$")
).show(10, False)

+-----+-----+------+------+---+----+
|nomes|idade|cidade|estado|cpf|cnpj|
+-----+-----+------+------+---+----+
+-----+-----+------+------+---+----+



In [158]:
# CLEAN CITIES

# Capitalize all cities
df = df.withColumn("cidade", initcap(col("cidade")))

In [159]:
#IDENTIFIES DIRTY DATA FOR STATES
states_list = ["AC", "AL", "AP", "AM", "BA", "CE", "DF", "ES", "GO", "MA", "MT", "MS", "MG", "PA", "PB", "PR", "PE", "PI", "RJ", "RN", "RS", "RO", "RR", "SC", "SP", "SE", "TO"]
df.filter(
    ~col("estado").isin(states_list)
).show(10, False)

+-----------------+-----+----------+----------------+--------------+------------------+
|nomes            |idade|cidade    |estado          |cpf           |cnpj              |
+-----------------+-----+----------+----------------+--------------+------------------+
|Kelly Matthews   |44   |Brasília  |distrito federal|39154836808   |24709301957761    |
|Michelle Sullivan|45   |Brasília  |distrito federal|603.384.507-26|18.355.115/6493-65|
|Cindy Brown      |28   |Brasília  |distrito federal|11561438537   |89158131394480    |
|Patricia Meyer Md|31   |Brasília  |distrito federal|432.435.813-38|68.879.919/2769-85|
|Christopher Young|38   |Brasília  |distrito federal|24594975984   |71848458367870    |
|Elizabeth Miller |79   |Brasília  |distrito federal|723.509.900-36|41.178.451/8696-87|
|Barbara Leon Md  |73   |Adamantina|sao  paulo      |05086323231   |98992168836002    |
|Stephen Torres   |48   |Adamantina|sao  paulo      |922.545.994-70|51.006.075/0099-24|
|Kelli Poole      |22   |Adamant

In [160]:
# CLEAN STATES

# Remove spaces
df = df.withColumn("estado", ltrim(rtrim(col("estado"))))

# Change name to initials
df = df.withColumn("estado", regexp_replace(col("estado"), "distrito federal", "DF"))
df = df.withColumn("estado", regexp_replace(col("estado"), "sao  paulo", "SP"))
df = df.withColumn("estado", regexp_replace(col("estado"), "são  paulo", "SP"))
df = df.withColumn("estado", regexp_replace(col("estado"), "rio de  janeiro", "RJ"))
df = df.withColumn("estado", regexp_replace(col("estado"), "MINAS GERAI", "MG"))
df = df.withColumn("estado", regexp_replace(col("estado"), "MGs", "MG"))

In [161]:
#IDENTIFIES DIRTY DATA FOR CPF

# Only digits
df = df.withColumn("cpf", regexp_replace(col("cpf"), "[^0-9]", ""))

df.filter(
    ~col("cpf").rlike("^[\\d]{11}$")
).show(10, False)

+-----+-----+------+------+---+----+
|nomes|idade|cidade|estado|cpf|cnpj|
+-----+-----+------+------+---+----+
+-----+-----+------+------+---+----+



In [27]:
# CLEAN CPF

In [163]:
#IDENTIFIES DIRTY DATA FOR CNPJ

# Only digits
df = df.withColumn("cnpj", regexp_replace(col("cnpj"), "[^0-9]", ""))

df.filter(
    ~col("cnpj").rlike("^[\\d]{14}$")
).show(10, False)

+-----+-----+------+------+---+----+
|nomes|idade|cidade|estado|cpf|cnpj|
+-----+-----+------+------+---+----+
+-----+-----+------+------+---+----+



In [164]:
# How many clients do we have?
df.count()

10000

In [166]:
# What is the clients ages average?
df.agg({"idade": "avg"}).show()

+----------+
|avg(idade)|
+----------+
|   53.7831|
+----------+



In [172]:
# How many clients by state?
df.groupBy("estado").count().show(30)

+------+-----+
|estado|count|
+------+-----+
|    SC|  370|
|    RO|  370|
|    PI|  370|
|    AM|  371|
|    RR|  370|
|    GO|  371|
|    TO|  370|
|    MT|  370|
|    SP|  370|
|    ES|  371|
|    PB|  370|
|    RS|  370|
|    MS|  370|
|    AL|  371|
|    MG|  370|
|    PA|  370|
|    BA|  371|
|    SE|  370|
|    PE|  370|
|    CE|  371|
|    RN|  370|
|    RJ|  370|
|    MA|  371|
|    AC|  371|
|    DF|  371|
|    PR|  370|
|    AP|  371|
+------+-----+



In [177]:
# How many valid and invalid CPF found?

def cpf_is_valid(cpf):
    cpf_numbers = [int(char) for char in cpf]
    
    # Validates CPF all same digits
    if cpf_numbers == cpf_numbers[::-1]:
        return False

    #  Validates last two digits
    for i in range(9, 11):
        value = sum((cpf_numbers[num] * ((i+1) - num) for num in range(0, i)))
        digit = ((value * 10) % 11) % 10
        if digit != cpf_numbers[i]:
            return False
    return True

cpf_is_valid_udf = udf(lambda x: cpf_is_valid(x), BooleanType())
df.withColumn("cpf_valid", cpf_is_valid_udf(col("cpf"))).groupBy("cpf_valid").count().show()

+---------+-----+
|cpf_valid|count|
+---------+-----+
|     true| 9999|
|    false|    1|
+---------+-----+



In [178]:
# How many valid and invalid CNPJ found?

def cnpj_is_valid(cnpj):
    cnpj_numbers = [int(char) for char in cnpj]
    
    if len(cnpj_numbers) != 14:
        return False
    
    # Validates CNPJ all same digits
    if cnpj_numbers == cnpj_numbers[::-1]:
        return False

    #  Validates last two digits
    prod = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
    beginning =  cnpj_numbers[:12]
    while len(beginning) < 14:
        r = sum([x*y for (x, y) in zip(beginning, prod)]) % 11
        if r > 1:
            f = 11 - r
        else:
            f = 0
        beginning.append(f)
        prod.insert(0, 6)
    
    if beginning != cnpj_numbers:
        return False
        
    return True

cnpj_is_valid_udf = udf(lambda x: cnpj_is_valid(x), BooleanType())
df.withColumn("cnpj_valid", cnpj_is_valid_udf(col("cnpj"))).groupBy("cnpj_valid").count().show()

+----------+-----+
|cnpj_valid|count|
+----------+-----+
|      true|10000|
+----------+-----+



In [179]:
df.show()

+------------------+-----+-------------------+------+-----------+--------------+
|             nomes|idade|             cidade|estado|        cpf|          cnpj|
+------------------+-----+-------------------+------+-----------+--------------+
|    Dennis Daniels|   31|         Acrelândia|    AC|97566536800|06589184909526|
|       Leah Becker|   42|        Água Branca|    AL|42526380707|25673336235020|
|        Sally Ford|   18|           Alvarães|    AM|34647754103|26543101702989|
|    Colleen Duncan|   21|     Serra Do Navio|    AP|25253156003|19062080510098|
|   Jeff Stephenson|   73|             Abaíra|    BA|49668886542|97794530015384|
|     Sydney Curtis|   85|            Abaiara|    CE|50620290749|29476298085678|
|    Kelly Matthews|   44|           Brasília|    DF|39154836808|24709301957761|
|         Juan Ruiz|   39|     Afonso Cláudio|    ES|22688119648|02420338147900|
|      Brian Thomas|   26|    Abadia De Goiás|    GO|47475484084|70723419110335|
|        Sara Ayers|   62|  

**Problema 2**: Você deverá implementar um programa, para ler, tratar e particionar os dados.

O arquivo fonte está disponível em `https://st-it-cloud-public.s3.amazonaws.com/people-v2_1E6.csv.gz`

### Data Quality

- Higienizar e homogenizar o formato da coluna `document`
- Detectar através da coluna `document` se o registro é de uma Pessoa Física ou Pessoa Jurídica, adicionando uma coluna com essa informação
- Higienizar e homogenizar o formato da coluna `birthDate`
- Existem duas colunas nesse dataset que em alguns registros estão trocadas. Quais são essas colunas? 
- Corrigir os dados com as colunas trocadas
- Além desses pontos, existem outras tratamentos para homogenizar esse dataset. Aplique todos que conseguir.

### Agregação dos dados

- Quais são as 5 PF que mais gastaram (`totalSpent`)? 
- Qual é o valor de gasto médio por estado (`state`)?
- Qual é o valor de gasto médio por `jobArea`?
- Qual é a PF que gastou menos (`totalSpent`)?
- Quantos nomes e documentos repetidos existem nesse dataset?
- Quantas linhas existem nesse dataset?

### Particionamento de dados tratados com as regras descritas em `DATA QUALITY`

- Particionar em arquivos PARQUET por estado (`state`)
- Particionar em arquivos CSV por ano/mes/dia de nascimento (`birthDate`)