# 03. Data Processing - PySpark

## 03.1. Importação das bibliotecas

### 03.1.1. Importando as bibliotecas nativas do Python

In [1]:
import sys

### 03.1.2. Importando as bibliotecas de terceiros

In [2]:
import pandas as pd
from pyspark.sql import SparkSession, types as T
from pyspark.sql.functions import *
# from ydata_profiling import ProfileReport

### 03.1.3. Importando os módulos locais do projeto

In [3]:
sys.path.append('..')
import functions.fn_stats_pyspark as fn_stats_pyspark
import params.consts as consts

## 03.2. Visão geral dos dados

### 03.2.0. Iniciando a sessão Spark

In [4]:
spark = SparkSession.builder.appName('spark').getOrCreate()

### 03.2.1. Habilitando a exibição de todas as colunas

In [5]:
pd.set_option('display.max_columns', None)

### 03.2.2. Armazenando o dataset em uma variável

In [6]:
df = spark.read \
    .format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .option('sep', ',') \
    .load(consts.DATASET_RAW_COMPRESSED_PYSPARK)

### 03.2.3. Visualizando a quantidade de linhas e colunas do dataset

In [7]:
(df.count(), len(df.columns))

(881666, 12)

### 03.2.4. Exibindo o dataset

In [8]:
df.show(5)

+----------+-----+------+------------------+--------------+-------------------+-----------+----------+----------------+-----------+----------------------+--------+
|CustomerID|idade|  sexo|tempo_como_cliente|frequencia_uso|ligacoes_callcenter|dias_atraso|assinatura|duracao_contrato|total_gasto|meses_ultima_interacao|cancelou|
+----------+-----+------+------------------+--------------+-------------------+-----------+----------+----------------+-----------+----------------------+--------+
|  193382.0| 62.0|Female|              24.0|          30.0|                3.0|       13.0|     Basic|          Annual|      410.0|                  27.0|     1.0|
|  193383.0| 29.0|  Male|               7.0|          11.0|                7.0|       29.0|  Standard|          Annual|      598.0|                   5.0|     1.0|
|  193384.0| 21.0|  Male|              16.0|           9.0|                7.0|       23.0|   Premium|       Quarterly|      988.0|                   9.0|     1.0|
|  193385.0| 23.

### 03.2.5. Visualizando os detalhes do dataset

In [9]:
df.printSchema()

root
 |-- CustomerID: double (nullable = true)
 |-- idade: double (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tempo_como_cliente: double (nullable = true)
 |-- frequencia_uso: double (nullable = true)
 |-- ligacoes_callcenter: double (nullable = true)
 |-- dias_atraso: double (nullable = true)
 |-- assinatura: string (nullable = true)
 |-- duracao_contrato: string (nullable = true)
 |-- total_gasto: double (nullable = true)
 |-- meses_ultima_interacao: double (nullable = true)
 |-- cancelou: double (nullable = true)



### 03.2.6. Visualizando as estatísticas das colunas numéricas

In [10]:
fn_stats_pyspark.summary(df).show(truncate = False)

+-------+----------+--------+------+------------------+--------------+-------------------+-----------+----------+----------------+-----------+----------------------+--------+
|summary|CustomerID|idade   |sexo  |tempo_como_cliente|frequencia_uso|ligacoes_callcenter|dias_atraso|assinatura|duracao_contrato|total_gasto|meses_ultima_interacao|cancelou|
+-------+----------+--------+------+------------------+--------------+-------------------+-----------+----------+----------------+-----------+----------------------+--------+
|count  |881664.0  |881664.0|881664|881663.0          |881663.0      |881664.0           |881664.0   |881661    |881663          |881664.0   |881664.0              |881664.0|
|mean   |225398.67 |39.37   |NULL  |31.26             |15.81         |3.6                |12.97      |NULL      |NULL            |631.62     |14.48                 |0.57    |
|stddev |129531.85 |12.44   |NULL  |17.26             |8.59          |3.07               |8.26       |NULL      |NULL        

### 03.2.7. Visualizando as estatísticas da colunas categóricas

In [11]:
categorical_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, T.StringType)]

df.describe(categorical_columns).show()

+-------+------+----------+----------------+
|summary|  sexo|assinatura|duracao_contrato|
+-------+------+----------+----------------+
|  count|881664|    881661|          881663|
|   mean|  NULL|      NULL|            NULL|
| stddev|  NULL|      NULL|            NULL|
|    min|Female|     Basic|          Annual|
|    max|  Male|  Standard|       Quarterly|
+-------+------+----------+----------------+



### 03.2.8. Verificando as colunas com valores nulos

In [12]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----------+-----+----+------------------+--------------+-------------------+-----------+----------+----------------+-----------+----------------------+--------+
|CustomerID|idade|sexo|tempo_como_cliente|frequencia_uso|ligacoes_callcenter|dias_atraso|assinatura|duracao_contrato|total_gasto|meses_ultima_interacao|cancelou|
+----------+-----+----+------------------+--------------+-------------------+-----------+----------+----------------+-----------+----------------------+--------+
|         2|    2|   2|                 3|             3|                  2|          2|         5|               3|          2|                     2|       2|
+----------+-----+----+------------------+--------------+-------------------+-----------+----------+----------------+-----------+----------------------+--------+



## 03.3. Processamento dos dados

### 03.3.1. Corrigindo os tipos de dados das colunas

- **N/A**

### 03.3.2. Tratando as colunas com valores nulos/vazios

In [13]:
df = df.dropna()

### 03.3.3. Criando colunas de tempo/idade

- **N/A**

### 03.3.4. Mesclando colunas com muitas categorias

- **N/A**

### 03.3.5. Unificando colunas semelhantes

- **N/A**

### 03.3.6. Criando colunas derivadas de outras

- **N/A**

### 03.3.7. Tratando as colunas com outliers

- **N/A**

### 03.3.8. Excluindo as colunas que contém valores únicos

In [14]:
df = df.drop('CustomerID')

### 03.3.9. Excluindo as colunas que contém valores constantes

- **N/A**

### 03.3.10. Excluindo as colunas auxiliares

- **N/A**

### 03.3.11. Ordenando as colunas

In [15]:
df = df.select(
    'idade',
    'sexo',
    'total_gasto',
    'assinatura',
    'duracao_contrato',
    'tempo_como_cliente',
    'frequencia_uso',
    'ligacoes_callcenter',
    'dias_atraso',
    'meses_ultima_interacao',
    'cancelou'
)

## 03.4. Exportação dos dados

### 03.4.1. Salvando o dataset clean em parquet e compactado em snappy

In [16]:
df.write \
    .format('parquet') \
    .mode('overwrite') \
    .option('compression', 'snappy') \
    .save(consts.DATASET_CLEAN_PYSPARK)

### 03.4.2. Gerando um novo relatório Profile Report

In [17]:
# report_eda = ProfileReport(df)

# report_eda.to_file(consts.EDA_1)