<a href="https://colab.research.google.com/github/pollianasilva/Soulcode/blob/main/Semana8_ETL_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **ETL com PySpark**

Escola: SoulCode Academy

Curso: Analista de Dados e Dashboard

Assunto: ETL

Professora: Franciane Rodrigues

Aluno (a): Polliana Silva

## **Solicitação**

Faça uma ETL no conjunto de dados usando PySpark

Dicionário de Dados

*  RowNumber: Número da linha no conjunto de dados.
*  CustomerId: Identificação única do cliente.
*  Surname: Sobrenome do cliente.
*  CreditScore: Pontuação de crédito do cliente, uma medida de sua credibilidade financeira.
*  Geography: Localização geográfica do cliente (por exemplo, país ou região).
*  Gender: Gênero do cliente.
*  Age: Idade do cliente.
*  Tenure: Tempo que o cliente permaneceu como cliente (em anos).
*  Balance: Saldo na conta do cliente.
*  NumOfProducts: Número de produtos financeiros que o cliente possui.
*  HasCrCard: Indicação se o cliente possui um cartão de crédito (1 para "sim", 0 para "não").
*  IsActiveMember: Indicação se o cliente é um membro ativo (1 para "sim", 0 para "não").
*  EstimatedSalary: Salário estimado do cliente.
*  Exited: Indicação se o cliente encerrou sua conta ou não (1 para "sim", 0 para "não").

Fonte: https://www.kaggle.com/datasets/mervetorkan/churndataset

## **Infraestrutura**

In [None]:
# Abertura Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Instalação da biblioteca
!pip install pyspark



In [None]:
# Abertura de biblioteca
from pyspark.sql import SparkSession    # utilizado para criação da sessão pyspark no colab
from pyspark.sql.functions import col, when

In [None]:
# Configurando ambiente
spark = (SparkSession.builder.master('local')                 # máquina local do colab
                             .appName('comandos_basicos')     # Nome da aplicação
                             .config('spark.ui.port', '4050') # Porta padrão do colab;
                             .getOrCreate())

In [None]:
# Checando o ambiente criado
spark

## **Extração**

In [None]:
# Extração
df = (spark.read.format('csv')
                .option('delimiter', ',')
                .option('header', 'true')
                .option('inferschema', 'true')
                .load('/content/drive/MyDrive/churn/churn.csv'))

In [None]:
# Visualizando a conjunto de dados
df.show()

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|     Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93

## **Pré-Análise**

In [None]:
# Visualização do conjunto de dados
df.show()

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|     Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93

In [None]:
# Visualização do cabeçalho
df.head()

Row(RowNumber=1, CustomerId=15634602, Surname='Hargrave', CreditScore=619, Geography='France', Gender='Female', Age=42, Tenure=2, Balance=0.0, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=101348.88, Exited=1)

In [None]:
# Visualizando as últimas posições do DataFrame
# Não há um método tail() direto em PySpark, você pode usar count()
# para determinar o número total de linhas e então mostrar as últimas linhas
total_rows = df.count()
df.limit(5).show(total_rows - 5) # Utilize + se quiser aparecer as primeiras posições

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [None]:
# Últimas posições
df.tail(5)

[Row(RowNumber=9996, CustomerId=15606229, Surname='Obijiaku', CreditScore=771, Geography='France', Gender='Male', Age=39, Tenure=5, Balance=0.0, NumOfProducts=2, HasCrCard=1, IsActiveMember=0, EstimatedSalary=96270.64, Exited=0),
 Row(RowNumber=9997, CustomerId=15569892, Surname='Johnstone', CreditScore=516, Geography='France', Gender='Male', Age=35, Tenure=10, Balance=57369.61, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=101699.77, Exited=0),
 Row(RowNumber=9998, CustomerId=15584532, Surname='Liu', CreditScore=709, Geography='France', Gender='Female', Age=36, Tenure=7, Balance=0.0, NumOfProducts=1, HasCrCard=0, IsActiveMember=1, EstimatedSalary=42085.58, Exited=1),
 Row(RowNumber=9999, CustomerId=15682355, Surname='Sabbatini', CreditScore=772, Geography='Germany', Gender='Male', Age=42, Tenure=3, Balance=75075.31, NumOfProducts=2, HasCrCard=1, IsActiveMember=0, EstimatedSalary=92888.52, Exited=1),
 Row(RowNumber=10000, CustomerId=15628319, Surname='Walker', CreditS

In [None]:
# Visualização do DataFrame de forma aleatória
df.sample(False, 0.2).show(6)  # 0.2 indica a fração das linhas a serem amostradas

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|       15|  15600882|    Scott|        635|    Spain|Female| 35|     7|      0.0|            2|        1|             1|       65951.65|     0|
|       17|  15737452|    Romeo|        653|  Germany|  Male| 58|     1|132602.88|            1|        1|             0|        5097.67|     1|
|       18|  15788218|Henderson|        549|    Spain|Female| 24|     9|      0.0|            2|        1|             1|       14

In [None]:
# Verificando o tamanho do DataFrame (linhas, colunas)
print((df.count(), len(df.columns)))

(10000, 14)


In [None]:
# Verificando o tipo de dados de cada coluna
df.dtypes

[('RowNumber', 'int'),
 ('CustomerId', 'int'),
 ('Surname', 'string'),
 ('CreditScore', 'int'),
 ('Geography', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Tenure', 'int'),
 ('Balance', 'double'),
 ('NumOfProducts', 'int'),
 ('HasCrCard', 'int'),
 ('IsActiveMember', 'int'),
 ('EstimatedSalary', 'double'),
 ('Exited', 'int')]

In [None]:
# Contar as observações em cada coluna
column_counts = [(col, df.select(col).count()) for col in df.columns]

# Exibir os resultados
for col, count in column_counts:
    print(f"{col}: {count}")

RowNumber: 10000
CustomerId: 10000
Surname: 10000
CreditScore: 10000
Geography: 10000
Gender: 10000
Age: 10000
Tenure: 10000
Balance: 10000
NumOfProducts: 10000
HasCrCard: 10000
IsActiveMember: 10000
EstimatedSalary: 10000
Exited: 10000


In [None]:
# Informações detalhadas do conjunto de dados
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



## **Transformação**

In [None]:
# Visualização do conjunto de dados
df.show()

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|     Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93

### **Renomeando colunas**

In [None]:
# Renomear as colunas
df = df \
    .withColumnRenamed("RowNumber", "numero_linha") \
    .withColumnRenamed("CustomerId", "id") \
    .withColumnRenamed("Surname", "sobrenome") \
    .withColumnRenamed("CreditScore", "pontuacao_credito") \
    .withColumnRenamed("Geography", "localizacao") \
    .withColumnRenamed("Gender", "genero") \
    .withColumnRenamed("Age", "idade") \
    .withColumnRenamed("Tenure", "tempo_permanencia") \
    .withColumnRenamed("Balance", "saldo") \
    .withColumnRenamed("NumOfProducts", "num_produtos") \
    .withColumnRenamed("HasCrCard", "tem_cartao_credito") \
    .withColumnRenamed("IsActiveMember", "membro_ativo") \
    .withColumnRenamed("EstimatedSalary", "salario_estimado") \
    .withColumnRenamed("Exited", "encerrou_conta")

In [None]:
# Visualização do conjunto de dados
df.show()

+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|numero_linha|      id|sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|           1|15634602| Hargrave|              619|     France|Female|   42|                2|      0.0|           1|                 1|           1|       101348.88|             1|
|           2|15647311|     Hill|              608|      Spain|Female|   41|                1| 83807.86|           1|                 0|           1|       112542.58|             0|
|           3|15619304|     Onio|              502|     France|Female|   42|              

### **Valores nulos**

In [None]:
# Verificar valores nulos em cada coluna
for column in df.columns:
    null_count = df.filter(df[column].isNull()).count()
    print(f"{column}: {null_count} nulos")

numero_linha: 0 nulos
id: 0 nulos
sobrenome: 0 nulos
pontuacao_credito: 0 nulos
localizacao: 0 nulos
genero: 0 nulos
idade: 0 nulos
tempo_permanencia: 0 nulos
saldo: 0 nulos
num_produtos: 0 nulos
tem_cartao_credito: 0 nulos
membro_ativo: 0 nulos
salario_estimado: 0 nulos
encerrou_conta: 0 nulos


In [None]:
# Antes de eliminar de vez, por favor filtre os dados
# Eliminação de dados nulos - como não tem deixei comentado
#df.dropna()

### **Valores únicos e Duplicados**

In [None]:
# Verificando se os dados são únicos na coluna 'id'
# Pelo fato de ter aparecido 'false', significa que temos dados duplicados
print("A coluna id possui valores únicos? Resposta:", df.select("id").distinct().count() == df.count())

A coluna id possui valores únicos? Resposta: True


In [None]:
# Verificando se os dados são únicos na coluna 'sobrenome'
# Pelo fato de ter aparecido 'false', significa que temos dados duplicados
print("A coluna sobrenome possui valores únicos? Resposta:", df.select("sobrenome").distinct().count() == df.count())

A coluna sobrenome possui valores únicos? Resposta: False


In [None]:
# Visualizando dados duplicados no conjunto de dados
df.orderBy("sobrenome").show()

+------------+--------+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|numero_linha|      id| sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+------------+--------+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|        3272|15708791|     Abazu|              584|      Spain|  Male|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|        5110|15576156|     Abazu|              710|      Spain|Female|   28|                6|      0.0|           1|                 1|           0|        48426.98|             0|
|         842|15737792|     Abbie|              818|     France|Female|   31|        

In [None]:
# Verificando a quantidade de sobrenomes repetidos na base de dados em ordem decrescente
df.groupBy("sobrenome").count().orderBy("count", ascending=False).show()

+---------+-----+
|sobrenome|count|
+---------+-----+
|    Smith|   32|
|    Scott|   29|
|   Martin|   29|
|   Walker|   28|
|    Brown|   26|
| Genovese|   25|
|      Yeh|   25|
|     Shih|   25|
|   Wright|   24|
|  Maclean|   24|
|       Ma|   23|
|  Fanucci|   23|
|   Wilson|   23|
|    White|   23|
|       Lu|   22|
|      Chu|   22|
|     Wang|   22|
|    Moore|   22|
|  Johnson|   22|
|      Sun|   21|
+---------+-----+
only showing top 20 rows



In [None]:
# Verificando a quantidade de sobrenomes repetidos na base de dados
df.groupBy("sobrenome").count().show()

+------------+-----+
|   sobrenome|count|
+------------+-----+
|       Tyler|    4|
|     Palermo|   12|
|      Piccio|   13|
|    Lazareva|    3|
|  Kambinachi|    5|
|       Virgo|    2|
| Baryshnikov|    2|
|     Wofford|    1|
|      Lavrov|    2|
|   Bezrukova|    2|
|      Avdeev|    1|
|      Clunie|    1|
|      Duigan|    2|
|    Sokolova|    2|
|      Azarov|    1|
|    Rawlings|    1|
|         Zox|    1|
|       Rubeo|    1|
|      Arbour|    1|
|Rapuluchukwu|    1|
+------------+-----+
only showing top 20 rows



In [None]:
# Filtrando pelo nome
df.filter(df["sobrenome"] == "Piccio").show()

+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|numero_linha|      id|sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|         128|15782688|   Piccio|              625|    Germany|  Male|   56|                0|148507.24|           1|                 1|           0|        46824.08|             1|
|        1146|15601688|   Piccio|              546|     France|  Male|   28|                8|      0.0|           1|                 1|           0|       159254.29|             0|
|        3815|15642093|   Piccio|              646|     France|  Male|   30|              

In [None]:
# Salvando o filtro da familia Piccio
df_filtro1 = df.filter(df["sobrenome"] == "Piccio")

In [None]:
# Mostrando resultado do filtro
df_filtro1.show()

+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|numero_linha|      id|sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|         128|15782688|   Piccio|              625|    Germany|  Male|   56|                0|148507.24|           1|                 1|           0|        46824.08|             1|
|        1146|15601688|   Piccio|              546|     France|  Male|   28|                8|      0.0|           1|                 1|           0|       159254.29|             0|
|        3815|15642093|   Piccio|              646|     France|  Male|   30|              

In [None]:
#Filtrar as linhas onde encerrou_conta é igual a 0
encerrou_conta_0 = df.filter(df.encerrou_conta == 0)

In [None]:
# Filtrar as linhas onde encerrou_conta é igual a 1
encerrou_conta_1 = df.filter(df.encerrou_conta == 1)

In [None]:
# Verificando o tamanho do DataFrame (linhas, colunas)
print((df.count(), len(df.columns)))

(10000, 14)


In [None]:
# Sugestão do Davi
# Converter o DataFrame do PySpark para um DataFrame do pandas
df_pandas = df.toPandas()

# Fazer a cópia
dfback_pandas = df_pandas.copy()

# Converter de volta para o PySpark, se necessário
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
dfback = spark.createDataFrame(dfback_pandas)

# backup
dfback = df.cache()
# verificando o tipo
print(type(dfback))

# arquivo atual
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
# mostrando o conjunto
dfback.show()

+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|numero_linha|      id|sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+------------+--------+---------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|           1|15634602| Hargrave|              619|     France|Female|   42|                2|      0.0|           1|                 1|           1|       101348.88|             1|
|           2|15647311|     Hill|              608|      Spain|Female|   41|                1| 83807.86|           1|                 0|           1|       112542.58|             0|
|           3|15619304|     Onio|              502|     France|Female|   42|              

In [None]:
# Eliminando valores duplicados na coluna sobrenome
df = df.dropDuplicates(['sobrenome'])

In [None]:
# Checando o tamanho do DataFrame (linhas, colunas)
print((df.count(), len(df.columns)))

(2932, 14)


### **Seleção de Colunas Importantes**

In [None]:
df.show()

+------------+--------+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|numero_linha|      id| sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+------------+--------+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|        3272|15708791|     Abazu|              584|      Spain|  Male|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|         842|15737792|     Abbie|              818|     France|Female|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|        2538|15723706|    Abbott|              573|     France|Female|   33|        

In [None]:
# Simulação
df.drop("numero_linha", "id").show()

+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|      Spain|  Male|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|     Abbie|              818|     France|Female|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|    Abbott|              573|     France|Female|   33|                0| 90124.64|           1|                 1|           0|       137476.71|             0|
|  Abdullah|              802|    

In [None]:
# Aplicação
df = df.drop("numero_linha", "id")

In [None]:
# Verificação
df.show()

+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|      Spain|  Male|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|     Abbie|              818|     France|Female|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|    Abbott|              573|     France|Female|   33|                0| 90124.64|           1|                 1|           0|       137476.71|             0|
|  Abdullah|              802|    

In [None]:
# Selecionando colunas
df.select("genero","pontuacao_credito", "localizacao", "saldo").show()

+------+-----------------+-----------+---------+
|genero|pontuacao_credito|localizacao|    saldo|
+------+-----------------+-----------+---------+
|  Male|              584|      Spain| 85534.83|
|Female|              818|     France|186796.37|
|Female|              573|     France| 90124.64|
|Female|              802|     France| 92887.06|
|Female|              562|    Germany| 117153.0|
|  Male|              544|     France|      0.0|
|Female|              560|     France|108883.29|
|  Male|              755|     France|104817.41|
|  Male|              661|    Germany|122552.48|
|Female|              630|      Spain|135483.17|
|  Male|              690|     France|      0.0|
|Female|              702|    Germany| 98775.23|
|  Male|              708|     France|      0.0|
|Female|              651|    Germany|133432.59|
|  Male|              791|     France|      0.0|
|Female|              506|     France|      0.0|
|  Male|              579|      Spain|118680.57|
|  Male|            

In [None]:
# Caso queira usar: Selecionando colunas
df_selec = df.select("genero","pontuacao_credito", "localizacao", "saldo")

In [None]:
# Verificando
df_selec.show()

+------+-----------------+-----------+---------+
|genero|pontuacao_credito|localizacao|    saldo|
+------+-----------------+-----------+---------+
|  Male|              584|      Spain| 85534.83|
|Female|              818|     France|186796.37|
|Female|              573|     France| 90124.64|
|Female|              802|     France| 92887.06|
|Female|              562|    Germany| 117153.0|
|  Male|              544|     France|      0.0|
|Female|              560|     France|108883.29|
|  Male|              755|     France|104817.41|
|  Male|              661|    Germany|122552.48|
|Female|              630|      Spain|135483.17|
|  Male|              690|     France|      0.0|
|Female|              702|    Germany| 98775.23|
|  Male|              708|     France|      0.0|
|Female|              651|    Germany|133432.59|
|  Male|              791|     France|      0.0|
|Female|              506|     France|      0.0|
|  Male|              579|      Spain|118680.57|
|  Male|            

### **Tradução de Categorias**

In [None]:
df.show()

+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|      Spain|  Male|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|     Abbie|              818|     France|Female|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|    Abbott|              573|     France|Female|   33|                0| 90124.64|           1|                 1|           0|       137476.71|             0|
|  Abdullah|              802|    

A expressão rdd.map(lambda x: x[0]).collect() em PySpark faz o seguinte:

*  RDD (rdd): Representa um Resilient Distributed Dataset, uma estrutura de dados fundamental em PySpark, que permite operações distribuídas.

*  map(lambda x: x[0]): Aplica uma função lambda a cada elemento do RDD, onde lambda x: x[0] extrai o primeiro elemento de cada tupla ou lista x.

*  collect(): Coleta todos os elementos do RDD de volta para o driver (o ambiente principal onde o código é executado), retornando uma lista de Python contendo todos os elementos processados pelo map.

Portanto, rdd.map(lambda x: x[0]).collect() retorna uma lista com o primeiro elemento de cada elemento do RDD rdd

In [None]:
# Verificação de valores unicos
sorted(df.select("localizacao").distinct().rdd.map(lambda x: x[0]).collect())

['France', 'Germany', 'Spain']

In [None]:
# Verificação de valores unicos
sorted(df.select("genero").distinct().rdd.map(lambda x: x[0]).collect())

['Female', 'Male']

In [None]:
# Simulação
df.withColumn("localizacao", when(df["localizacao"] == "France", "França") \
                            .when(df["localizacao"] == "Germany", "Alemanha")\
                            .when(df["localizacao"] == "Spain", "Espanha")\
                            .otherwise(df["localizacao"])).show()

+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|  Male|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|     Abbie|              818|     França|Female|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|    Abbott|              573|     França|Female|   33|                0| 90124.64|           1|                 1|           0|       137476.71|             0|
|  Abdullah|              802|    

In [None]:
# Aplicação
df = df.withColumn("localizacao", when(df["localizacao"] == "France", "França") \
                            .when(df["localizacao"] == "Germany", "Alemanha")\
                            .when(df["localizacao"] == "Spain", "Espanha")\
                            .otherwise(df["localizacao"]))

In [None]:
# Simulação
df.withColumn("genero", when(df["genero"] == "Female", "Feminino") \
                         .when(df["genero"] == "Male", "Masculino") \
                         .otherwise(df["genero"])).show()

+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|   genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|Masculino|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|     Abbie|              818|     França| Feminino|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|    Abbott|              573|     França| Feminino|   33|                0| 90124.64|           1|                 1|           0|       137476.71|             0|
|  Abdullah|    

In [None]:
# Aplicação
df = df.withColumn("genero", when(df["genero"] == "Female", "Feminino") \
                         .when(df["genero"] == "Male", "Masculino") \
                         .otherwise(df["genero"]))

In [None]:
# Verificação geral
df.show()

+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|   genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|Masculino|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|     Abbie|              818|     França| Feminino|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|    Abbott|              573|     França| Feminino|   33|                0| 90124.64|           1|                 1|           0|       137476.71|             0|
|  Abdullah|    

### **Verificação de inconsistências**

In [None]:
# Iterando sobre as colunas do DataFrame
for column, dtype in df.dtypes:
    print(f"Valores únicos na coluna '{column}':")

    # Verificando se a coluna contém valores inteiros
    if dtype == 'int':
        valores_unicos = sorted(df.select(column).distinct().rdd.map(lambda x: x[0]).collect())
        print(valores_unicos)

    # Verificando se a coluna contém valores de ponto flutuante
    elif dtype == 'double':
        valores_unicos = sorted(df.select(column).distinct().rdd.map(lambda x: x[0]).collect())
        print(valores_unicos)

    # Verificando se a coluna contém valores de texto (string)
    elif dtype == 'string':
        valores_unicos = sorted(df.select(column).distinct().rdd.map(lambda x: x[0]).collect())
        print(valores_unicos)

    print("-" * 50)  # Linha de separação

Valores únicos na coluna 'sobrenome':
['Abazu', 'Abbie', 'Abbott', 'Abdullah', 'Abdulov', 'Abel', 'Abernathy', 'Abramov', 'Abramova', 'Abramovich', 'Abramowitz', 'Abrego', 'Abron', 'Achebe', 'Adams', 'Adamson', 'Afamefula', 'Afamefuna', 'Afanasyev', 'Afanasyeva', 'Agafonova', 'Aguirre', 'Ah Mouy', 'Ahern', 'Ahmed', 'Aiken', 'Aikenhead', 'Ainsworth', 'Aitken', 'Ajuluchukwu', 'Akabueze', 'Akeroyd', 'Akhtar', 'Akobundu', 'Aksakova', 'Aksenov', 'Aksenova', 'Aksyonov', 'Aksyonova', 'Akubundu', 'Akudinobi', 'Alaniz', 'Alderete', 'Aldrich', 'Aldridge', 'Aleksandrova', 'Alekseeva', 'Alekseyeva', 'Aleshire', 'Alexander', 'Alexandrov', 'Alexandrova', 'Alexeeva', 'Alexeieva', 'Alexeyeva', 'Algarin', 'Algeranoff', 'Ali', 'Aliyev', 'Aliyeva', 'Allan', 'Allard', 'Allardyce', 'Allen', 'Alley', 'Alleyne', 'Allingham', 'Allnutt', 'Allsop', 'Alvares', 'Alvarez', 'Amadi', 'Amaechi', 'Amechi', 'Amies', 'Amos', 'Ampt', 'Anayochukwu', 'Anayolisa', 'Andersen', 'Anderson', 'Andreev', 'Andrejew', 'Andrews', 'A

### **Alteração de Categoria**

In [None]:
df.show()

+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|   genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|Masculino|   32|                9| 85534.83|           1|                 0|           0|       169137.24|             0|
|     Abbie|              818|     França| Feminino|   31|                1|186796.37|           1|                 0|           0|       178252.63|             0|
|    Abbott|              573|     França| Feminino|   33|                0| 90124.64|           1|                 1|           0|       137476.71|             0|
|  Abdullah|    

In [None]:
# Verificação
df.withColumn("tem_cartao_credito", when(df["tem_cartao_credito"] == 0, "Não")
                                   .when(df["tem_cartao_credito"] == 1, "Sim")
                                   .otherwise(df["tem_cartao_credito"])).show()

+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|   genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|Masculino|   32|                9| 85534.83|           1|               Não|           0|       169137.24|             0|
|     Abbie|              818|     França| Feminino|   31|                1|186796.37|           1|               Não|           0|       178252.63|             0|
|    Abbott|              573|     França| Feminino|   33|                0| 90124.64|           1|               Sim|           0|       137476.71|             0|
|  Abdullah|    

In [None]:
# Com função
def substituir_valores(df, coluna):
    return df.withColumn(coluna, when(df[coluna] == 0, "Não")
                                .when(df[coluna] == 1, "Sim")
                                .otherwise(df[coluna]))

# Aplicação
df = substituir_valores(df, "tem_cartao_credito")
df = substituir_valores(df, "membro_ativo")
df = substituir_valores(df, "encerrou_conta")

In [None]:
df.show()

+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|   genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|Masculino|   32|                9| 85534.83|           1|               Não|         Não|       169137.24|           Não|
|     Abbie|              818|     França| Feminino|   31|                1|186796.37|           1|               Não|         Não|       178252.63|           Não|
|    Abbott|              573|     França| Feminino|   33|                0| 90124.64|           1|               Sim|         Não|       137476.71|           Não|
|  Abdullah|    

### **Tipos de Dados**

In [None]:
# Verificando o tipo de dados de cada coluna
df.dtypes

[('sobrenome', 'string'),
 ('pontuacao_credito', 'int'),
 ('localizacao', 'string'),
 ('genero', 'string'),
 ('idade', 'int'),
 ('tempo_permanencia', 'int'),
 ('saldo', 'double'),
 ('num_produtos', 'int'),
 ('tem_cartao_credito', 'string'),
 ('membro_ativo', 'string'),
 ('salario_estimado', 'double'),
 ('encerrou_conta', 'string')]

In [None]:
# Informações detalhadas do conjunto de dados
df.printSchema()

root
 |-- sobrenome: string (nullable = true)
 |-- pontuacao_credito: integer (nullable = true)
 |-- localizacao: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- idade: integer (nullable = true)
 |-- tempo_permanencia: integer (nullable = true)
 |-- saldo: double (nullable = true)
 |-- num_produtos: integer (nullable = true)
 |-- tem_cartao_credito: string (nullable = true)
 |-- membro_ativo: string (nullable = true)
 |-- salario_estimado: double (nullable = true)
 |-- encerrou_conta: string (nullable = true)



No PySpark, você pode trabalhar com vários tipos de dados para representar diferentes tipos de informações. Alguns dos tipos de dados comuns disponíveis no PySpark incluem:

*  String: Para armazenar dados de texto.
*  Integer: Para armazenar números inteiros.
*  Double: Para armazenar números decimais de precisão dupla (64 bits).
*  Float: Para armazenar números decimais de precisão simples (32 bits).
*  Boolean: Para armazenar valores booleanos (True ou False).
*  DateType: Para armazenar datas.

In [None]:
df.show()

+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|   genero|idade|tempo_permanencia|    saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+---------+-----+-----------------+---------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|Masculino|   32|                9| 85534.83|           1|               Não|         Não|       169137.24|           Não|
|     Abbie|              818|     França| Feminino|   31|                1|186796.37|           1|               Não|         Não|       178252.63|           Não|
|    Abbott|              573|     França| Feminino|   33|                0| 90124.64|           1|               Sim|         Não|       137476.71|           Não|
|  Abdullah|    

In [None]:
# Convertendo tipos de dados necessários
df = df.withColumn("saldo", df["saldo"].cast("float"))
df = df.withColumn("salario_estimado", df["salario_estimado"].cast("float"))

In [None]:
# verificação
df.dtypes

[('sobrenome', 'string'),
 ('pontuacao_credito', 'int'),
 ('localizacao', 'string'),
 ('genero', 'string'),
 ('idade', 'int'),
 ('tempo_permanencia', 'int'),
 ('saldo', 'float'),
 ('num_produtos', 'int'),
 ('tem_cartao_credito', 'string'),
 ('membro_ativo', 'string'),
 ('salario_estimado', 'float'),
 ('encerrou_conta', 'string')]

In [None]:
df.printSchema()

root
 |-- sobrenome: string (nullable = true)
 |-- pontuacao_credito: integer (nullable = true)
 |-- localizacao: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- idade: integer (nullable = true)
 |-- tempo_permanencia: integer (nullable = true)
 |-- saldo: float (nullable = true)
 |-- num_produtos: integer (nullable = true)
 |-- tem_cartao_credito: string (nullable = true)
 |-- membro_ativo: string (nullable = true)
 |-- salario_estimado: float (nullable = true)
 |-- encerrou_conta: string (nullable = true)



### **Integridade dos dados**

In [None]:
# Instalação do pacote pandera
!pip install pandera



In [None]:
# Importando Biblioteca
import pandas as pd
import pandera as pa

In [None]:
# Coletando os dados do PySpark DataFrame para Pandas
df_pandas = df.toPandas()

In [None]:
# Vendo o nosso conjunto de dados
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2932 entries, 0 to 2931
Data columns (total 12 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   sobrenome           2932 non-null   object 
 1   pontuacao_credito   2932 non-null   int32  
 2   localizacao         2932 non-null   object 
 3   genero              2932 non-null   object 
 4   idade               2932 non-null   int32  
 5   tempo_permanencia   2932 non-null   int32  
 6   saldo               2932 non-null   float32
 7   num_produtos        2932 non-null   int32  
 8   tem_cartao_credito  2932 non-null   object 
 9   membro_ativo        2932 non-null   object 
 10  salario_estimado    2932 non-null   float32
 11  encerrou_conta      2932 non-null   object 
dtypes: float32(2), int32(4), object(6)
memory usage: 206.3+ KB


In [None]:
# Definindo o esquema de validação com Pandera
schema = pa.DataFrameSchema({
    'sobrenome': pa.Column(pa.String),
    'pontuacao_credito': pa.Column(pa.Int32),
    'localizacao': pa.Column(pa.String),
    'genero': pa.Column(pa.String),
    'idade': pa.Column(pa.Int32),
    'tempo_permanencia': pa.Column(pa.Int32),
    'saldo': pa.Column(pa.Float32),
    'num_produtos': pa.Column(pa.Int32),
    'tem_cartao_credito': pa.Column(pa.String),
    'membro_ativo': pa.Column(pa.String),
    'salario_estimado': pa.Column(pa.Float32),
    'encerrou_conta': pa.Column(pa.String)
})

In [None]:
# Validar o DataFrame
schema.validate(df_pandas)

Unnamed: 0,sobrenome,pontuacao_credito,localizacao,genero,idade,tempo_permanencia,saldo,num_produtos,tem_cartao_credito,membro_ativo,salario_estimado,encerrou_conta
0,Abazu,584,Espanha,Masculino,32,9,85534.828125,1,Não,Não,169137.234375,Não
1,Abbie,818,França,Feminino,31,1,186796.375000,1,Não,Não,178252.625000,Não
2,Abbott,573,França,Feminino,33,0,90124.640625,1,Sim,Não,137476.703125,Não
3,Abdullah,802,França,Feminino,60,3,92887.062500,1,Sim,Não,39473.628906,Sim
4,Abdulov,562,Alemanha,Feminino,31,9,117153.000000,1,Sim,Sim,108675.007812,Não
...,...,...,...,...,...,...,...,...,...,...,...,...
2927,Zubarev,749,França,Feminino,26,6,0.000000,2,Não,Sim,34948.769531,Não
2928,Zubareva,592,França,Masculino,42,1,147249.296875,2,Sim,Sim,63023.019531,Não
2929,Zuev,730,França,Masculino,39,1,116537.601562,1,Não,Não,145679.593750,Não
2930,Zuyev,621,Espanha,Masculino,53,9,170491.843750,1,Sim,Não,35588.070312,Sim


In [None]:
# Validar o DataFrame do Pandas
validated_df = schema.validate(df_pandas)

In [None]:
# Criar DataFrame do Pandas validado como PySpark DataFrame
spark_df = spark.createDataFrame(validated_df)

In [None]:
# Verificando a conversão
spark_df.show()

+----------+-----------------+-----------+---------+-----+-----------------+--------------+------------+------------------+------------+----------------+--------------+
| sobrenome|pontuacao_credito|localizacao|   genero|idade|tempo_permanencia|         saldo|num_produtos|tem_cartao_credito|membro_ativo|salario_estimado|encerrou_conta|
+----------+-----------------+-----------+---------+-----+-----------------+--------------+------------+------------------+------------+----------------+--------------+
|     Abazu|              584|    Espanha|Masculino|   32|                9|  85534.828125|           1|               Não|         Não|   169137.234375|           Não|
|     Abbie|              818|     França| Feminino|   31|                1|    186796.375|           1|               Não|         Não|      178252.625|           Não|
|    Abbott|              573|     França| Feminino|   33|                0|  90124.640625|           1|               Sim|         Não|   137476.703125|  

### **Carregamento**

In [None]:
spark_df.write.format('csv').save('/content/drive/MyDrive/churn/churn_tratado2')

### **Visualização de Dados**

In [None]:
# Visualizando estatisticas descritivas para dados numéricos
spark_df.summary().show()

+-------+---------+-----------------+-----------+---------+------------------+------------------+-----------------+------------------+------------------+------------+------------------+--------------+
|summary|sobrenome|pontuacao_credito|localizacao|   genero|             idade| tempo_permanencia|            saldo|      num_produtos|tem_cartao_credito|membro_ativo|  salario_estimado|encerrou_conta|
+-------+---------+-----------------+-----------+---------+------------------+------------------+-----------------+------------------+------------------+------------+------------------+--------------+
|  count|     2932|             2932|       2932|     2932|              2932|              2932|             2932|              2932|              2932|        2932|              2932|          2932|
|   mean|     NULL|652.1289222373806|       NULL|     NULL| 38.98806275579809|4.9566848567530695|75829.90900249404|1.5368349249658937|              NULL|        NULL|100174.63712055524|          N

In [None]:
# Instalando PlotLy
!pip install plotly



In [None]:
# Abertura de pacotes
import plotly.graph_objs as go
import plotly.io as pio
from plotly.subplots import make_subplots

In [None]:
# Coletando os dados do PySpark DataFrame para Pandas DataFrame para Plotly
df_pandas = spark_df.toPandas()

In [None]:
# Criando o layout do dashboard com 4 gráficos
fig = make_subplots(rows=2, cols=2, subplot_titles=("Distribuição de Idades", "Saldo Médio por Gênero",
                                                   "Proporção de Clientes Ativos", "Distribuição de Pontuação de Crédito"),
                    specs=[[{}, {}], [{"type": "pie"}, {}]])

# Gráfico 1: Distribuição de Idades
fig.add_trace(
    go.Histogram(x=df_pandas['idade'], name='Idade'),
    row=1, col=1
)

# Gráfico 2: Saldo Médio por Gênero
saldo_por_genero = df_pandas.groupby('genero')['saldo'].mean().reset_index()
fig.add_trace(
    go.Bar(x=saldo_por_genero['genero'], y=saldo_por_genero['saldo'], name='Saldo Médio'),
    row=1, col=2
)

# Gráfico 3: Proporção de Clientes Ativos
ativos_counts = df_pandas['membro_ativo'].value_counts()
fig.add_trace(
    go.Pie(labels=ativos_counts.index, values=ativos_counts.values, name='Clientes Ativos'),
    row=2, col=1
)

# Gráfico 4: Distribuição de Pontuação de Crédito
fig.add_trace(
    go.Box(y=df_pandas['pontuacao_credito'], name='Pontuação de Crédito'),
    row=2, col=2
)

# Atualizando layout e configurações
fig.update_layout(
    title='Dashboard Interativo com Plotly',
    showlegend=True,
    height=700,
    width=1300,
)

# Exibindo o dashboard interativo
pio.show(fig)