<a href="https://colab.research.google.com/github/nadjaguerra/SoulCode-Bootamp/blob/main/ETL_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **ETL com PySpark**


## **Solicitação**

ETL no conjunto de dados usando PySpark

Dicionário de Dados

*  RowNumber: Número da linha no conjunto de dados.
*  CustomerId: Identificação única do cliente.
*  Surname: Sobrenome do cliente.
*  CreditScore: Pontuação de crédito do cliente, uma medida de sua credibilidade financeira.
*  Geography: Localização geográfica do cliente (por exemplo, país ou região).
*  Gender: Gênero do cliente.
*  Age: Idade do cliente.
*  Tenure: Tempo que o cliente permaneceu como cliente (em anos).
*  Balance: Saldo na conta do cliente.
*  NumOfProducts: Número de produtos financeiros que o cliente possui.
*  HasCrCard: Indicação se o cliente possui um cartão de crédito (1 para "sim", 0 para "não").
*  IsActiveMember: Indicação se o cliente é um membro ativo (1 para "sim", 0 para "não").
*  EstimatedSalary: Salário estimado do cliente.
*  Exited: Indicação se o cliente encerrou sua conta ou não (1 para "sim", 0 para "não").

Fonte: https://www.kaggle.com/datasets/mervetorkan/churndataset

## **Infraestrutura**

In [None]:
# Abertura Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Instalação da biblioteca
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=d5659c9445a2e52ded97266acc3a644fa3869af78cf796d981176532a0a21e95
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
# Abertura de biblioteca
from pyspark.sql import SparkSession    # utilizado para criação da sessão pyspark no colab
from pyspark.sql.functions import col, when

In [None]:
# Configurando ambiente
spark = (SparkSession.builder.master('local')                 # máquina local do colab
                             .appName('comandos_basicos')     # Nome da aplicação
                             .config('spark.ui.port', '4050') # Porta padrão do colab;
                             .getOrCreate())

In [None]:
# Checando o ambiente criado
spark

In [None]:
# Extração
df = (spark.read.format('csv')
                .option('delimiter', ',')
                .option('header', 'true')
                .option('inferschema', 'true')
                .load('/content/drive/MyDrive/BootCamp Análise de Dados/PySpark/churn.csv'))

In [None]:
# Visualizando a conjunto de dados
df.show()

## **Pré-Análise**

In [None]:
# Visualização do conjunto de dados
df.show()

In [None]:
# Visualização do cabeçalho
df.head()

In [None]:
# mesmos dados de cima porem em tabela
total_rows = df.count()
df.limit(1).show(total_rows + 1)

In [None]:
# Visualizando as últimas posições do DataFrame
# Não há um método tail() direto em PySpark, você pode usar count()
# para determinar o número total de linhas e então mostrar as últimas linhas
total_rows = df.count()
df.limit(5).show(total_rows - 5)

In [None]:
# Visualização do DataFrame de forma aleatória
df.sample(False, 0.1).show(4)  # 0.1 indica a fração das linhas a serem amostradas

In [None]:
# Verificando o tamanho do DataFrame (linhas, colunas)
(df.count(), len(df.columns))

In [None]:
# Verificando o tipo de dados de cada coluna
df.dtypes

In [None]:
# Contar as observações em cada coluna
column_counts = [(col, df.select(col).count()) for col in df.columns]

# Exibir os resultados
for col, count in column_counts:
    print(f"{col}: {count}")

In [None]:
# Informações detalhadas do conjunto de dados
df.printSchema()

## **Transformação**

In [None]:
# Visualização do conjunto de dados
df.show()

In [None]:
# Renomear as colunas
df = df \
    .withColumnRenamed("RowNumber", "numero_linha") \
    .withColumnRenamed("CustomerId", "id") \
    .withColumnRenamed("Surname", "sobrenome") \
    .withColumnRenamed("CreditScore", "pontuacao_credito") \
    .withColumnRenamed("Geography", "localizacao") \
    .withColumnRenamed("Gender", "genero") \
    .withColumnRenamed("Age", "idade") \
    .withColumnRenamed("Tenure", "tempo_permanencia") \
    .withColumnRenamed("Balance", "saldo") \
    .withColumnRenamed("NumOfProducts", "num_produtos") \
    .withColumnRenamed("HasCrCard", "tem_cartao_credito") \
    .withColumnRenamed("IsActiveMember", "membro_ativo") \
    .withColumnRenamed("EstimatedSalary", "salario_estimado") \
    .withColumnRenamed("Exited", "encerrou_conta")

In [None]:
# Visualização do conjunto de dados
df.show()

### **Valores nulos**

In [None]:
# Verificar valores nulos em cada coluna
for column in df.columns:
    null_count = df.filter(df[column].isNull()).count()
    print(f"{column}: {null_count} nulos")

In [None]:
# Antes de eliminar de vez, por favor filtre os dados
# Eliminação de dados nulos - como não tem deixei comentado
#df.dropna()

### **Valores Únicos e Duplicados**

In [None]:
# Verificando se os dados são únicos na coluna 'id'
# Pelo fato de ter aparecido 'false', significa que temos dados duplicados
print("A coluna id possui valores únicos? Resposta:", df.select("id").distinct().count() == df.count())

In [None]:
# Verificando se os dados são únicos na coluna 'sobrenome'
# Pelo fato de ter aparecido 'false', significa que temos dados duplicados
print("A coluna sobrenome possui valores únicos? Resposta:", df.select("sobrenome").distinct().count() == df.count())

In [None]:
# Visualizando dados duplicados no conjunto de dados
df.orderBy("sobrenome").show()

In [None]:
# Verificando a quantidade de sobrenomes repetidos na base de dados em ordem decrescente
df.groupBy("sobrenome").count().orderBy("count", ascending=False).show()

In [None]:
# Verificando a quantidade de sobrenomes repetidos na base de dados
df.groupBy("sobrenome").count().show()

In [None]:
# Filtrando pelo nome
df.filter(df["sobrenome"] == "Piccio").show()

In [None]:
# Salvando o filtro da familia Piccio
df_filtro1 = df.filter(df["sobrenome"] == "Piccio")

In [None]:
# Mostrando resultado do filtro
df_filtro1.show()

In [None]:
# Verificando o tamanho do DataFrame (linhas, colunas)
print((df.count(), len(df.columns)))

In [None]:
# Eliminando valores duplicados na coluna sobrenome
df = df.dropDuplicates(['sobrenome'])

In [None]:
# Checando o tamanho do DataFrame (linhas, colunas)
print((df.count(), len(df.columns)))

In [None]:
#Filtrar as linhas onde encerrou_conta é igual a 0
encerrou_conta_0 = df.filter(df.encerrou_conta == 0)

In [None]:
# Filtrar as linhas onde encerrou_conta é igual a 1
encerrou_conta_1 = df.filter(df.encerrou_conta == 1)

In [None]:
# Simulação
df.drop("numero_linha", "id").show()

In [None]:
# Aplicação
df = df.drop("numero_linha", "id")

In [None]:
# Verificação
df.show()

In [None]:
# Selecionando colunas
df.select("genero","pontuacao_credito", "localizacao", "saldo").show()

In [None]:
# Caso queira usar: Selecionando colunas
df_selec = df.select("genero","pontuacao_credito", "localizacao", "saldo")

In [None]:
# Verificando
df_selec.show()

### **Tradução de Categorias**

In [None]:
df.show()

A expressão rdd.map(lambda x: x[0]).collect() em PySpark faz o seguinte:

*  RDD (rdd): Representa um Resilient Distributed Dataset, uma estrutura de dados fundamental em PySpark, que permite operações distribuídas.

*  map(lambda x: x[0]): Aplica uma função lambda a cada elemento do RDD, onde lambda x: x[0] extrai o primeiro elemento de cada tupla ou lista x.

*  collect(): Coleta todos os elementos do RDD de volta para o driver (o ambiente principal onde o código é executado), retornando uma lista de Python contendo todos os elementos processados pelo map.

Portanto, rdd.map(lambda x: x[0]).collect() retorna uma lista com o primeiro elemento de cada elemento do RDD rdd

In [None]:
# Verificação de valores unicos
sorted(df.select("localizacao").distinct().rdd.map(lambda x: x[0]).collect())

In [None]:
# Verificação de valores unicos
sorted(df.select("genero").distinct().rdd.map(lambda x: x[0]).collect())