
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>


# Construir um Pipeline de Engenharia de Recursos

Nesta demonstração, construiremos um pipeline de engenharia de recursos para gerenciar o carregamento de dados, imputação e transformação. O pipeline será aplicado nos conjuntos de treinamento, teste e validação, com os resultados exibidos. A etapa final envolve salvar o pipeline em disco para uso futuro, garantindo uma preparação de dados eficiente e consistente para tarefas de aprendizado de máquina.

**Objetivos de Aprendizagem:**

*Ao final desta demonstração, você será capaz de:*

* Criar um pipeline de preparação de dados e engenharia de recursos com múltiplas etapas.
* Criar um pipeline com tarefas para imputação e transformação de dados.
* Aplicar um pipeline de preparação de dados e engenharia de recursos em um conjunto de treinamento/modelagem e um conjunto de retenção.
* Exibir os resultados da transformação.
* Salvar um pipeline de preparação de dados e engenharia de recursos para uso futuro potencial.


## Requisitos

Por favor, revise os seguintes requisitos antes de iniciar a lição:

* Para executar este notebook, você precisa usar uma das seguintes versões do Databricks: **13.3.x-cpu-ml-scala2.12**


## Configuração da Sala de Aula

Antes de iniciar a demonstração, execute o script de configuração da sala de aula fornecido. Este script definirá as variáveis de configuração necessárias para a demonstração. Execute a célula a seguir:

In [0]:
%run ../Includes/Classroom-Setup-01

**Outras Convenções:**

Durante esta demonstração, vamos nos referir ao objeto `DA`. Este objeto, fornecido pela Databricks Academy, contém **variáveis como seu nome de usuário, nome do catálogo, nome do esquema, diretório de trabalho e locais do conjunto de dados**. Execute o bloco de código abaixo para visualizar esses detalhes:

In [0]:
print(f"Username:          {DA.username}")
print(f"Catalog Name:      {DA.catalog_name}")
print(f"Schema Name:       {DA.schema_name}")
print(f"Working Directory: {DA.paths.working_dir}")
print(f"Dataset Location:  {DA.paths.datasets}")


## Preparação de Dados

Antes de construir o pipeline, garantiremos a consistência no conjunto de dados convertendo colunas Inteiro e Booleano para o tipo de dados Double e lidando com valores ausentes em colunas numéricas e de string dentro do conjunto de dados **`Telco`**. Estes são os passos que seguiremos nesta seção.

1. Carregar conjunto de dados

1. Dividir conjunto de dados em conjuntos de treino e teste

1. Convertendo Colunas Inteiro e Booleano para Double

1. Lidando com Valores Ausentes

  * Colunas Numéricas

  * Colunas de String


### Carregar Conjunto de Dados

In [0]:
from pyspark.sql.functions import when, col

# Carregar conjunto de dados
dataset_path = f"{DA.paths.datasets}/telco/telco-customer-churn-missing.csv"
telco_df = spark.read.csv(dataset_path, header="true", inferSchema="true", multiLine="true", escape='"')

# Selecionar colunas de interesse
telco_df = telco_df.select("gender", "SeniorCitizen", "Partner", "tenure", "InternetService", "Contract", "PaperlessBilling", "PaymentMethod", "TotalCharges", "Churn")

Pré-processamento rápido
* `SeniorCitizen` como `boolean`
* `TotalCharges` como `double`

In [0]:
# substitua valores "null" com Null
# para coluna em telco_df.colunas:
# telco_df = telco_df.withColumn(coluna, quando(col(coluna) == "null", None).caso contrário(col(coluna)))

# limpar colunas
telco_df = telco_df.withColumn("SeniorCitizen", when(col("SeniorCitizen")==1, True).otherwise(False))
telco_df = telco_df.withColumn("TotalCharges", col("TotalCharges").cast("double"))

display(telco_df)

### Divisão Treino / Teste

In [0]:
train_df, test_df = telco_df.randomSplit([.8, .2], seed=42)

### Transformar Conjunto de Dados

In [0]:
from pyspark.sql.types import IntegerType, BooleanType, StringType, DoubleType
from pyspark.sql.functions import col, count, when


# Obtenha uma lista de colunas inteiras e booleanas
integer_cols = [column.name for column in train_df.schema.fields if (column.dataType == IntegerType() or column.dataType == BooleanType())]

# Faça um loop pelas colunas inteiras para converter cada uma em dupla
for column in integer_cols:
    train_df = train_df.withColumn(column, col(column).cast("double"))
    test_df = test_df.withColumn(column, col(column).cast("double"))

string_cols = [c.name for c in train_df.schema.fields if c.dataType == StringType()]
num_cols = [c.name for c in train_df.schema.fields if c.dataType == DoubleType()]

# Obtenha uma lista de colunas com valores ausentes
# Numérico
num_missing_values_logic = [count(when(col(column).isNull(),column)).alias(column) for column in num_cols]
row_dict_num = train_df.select(num_missing_values_logic).first().asDict()
num_missing_cols = [column for column in row_dict_num if row_dict_num[column] > 0]

# String
string_missing_values_logic = [count(when(col(column).isNull(),column)).alias(column) for column in string_cols]
row_dict_string = train_df.select(string_missing_values_logic).first().asDict()
string_missing_cols = [column for column in row_dict_string if row_dict_string[column] > 0]

print(f"Colunas numéricas com valores ausentes: {num_missing_cols}")
print(f"Colunas de string com valores ausentes: {string_missing_cols}")


## Criar um Pipeline

Define um pipeline Spark ML para pré-processar um conjunto de dados, incluindo a indexação de colunas categóricas, a imputação de valores ausentes, o dimensionamento de recursos numéricos, a codificação one-hot em recursos categóricos e a montagem do vetor de recurso final para aprendizado de máquina.

Neste pipeline Spark ML, pré-processamos um conjunto de dados para prever a rotatividade de clientes em uma empresa de telecomunicações **`telco`**. O pipeline inclui as seguintes etapas-chave:

* **Converter Colunas Categóricas em Índices Numéricos:**
Esta etapa converte colunas categóricas em índices numéricos, permitindo que o modelo processe dados categóricos.

* **Imputar Valores Ausentes:**
O Imputer é usado para preencher valores ausentes em **colunas numéricas com valores ausentes (por exemplo, `tenure`, `TotalCharges`) usando a estratégia `mean`**, garantindo que o conjunto de dados esteja completo e pronto para análise.
**Valores categóricos ausentes serão automaticamente codificados como uma categoria separada.**

* **VectorAssembler e RobustScaler:**
Essas etapas combinam colunas numéricas relevantes em um vetor de recurso e, em seguida, dimensionam os recursos para reduzir a sensibilidade a outliers.

* **Executar Codificação One Hot em Variável Categórica:**
Esta etapa converte as colunas categóricas indexadas em vetores esparsos binários, permitindo que o modelo processe dados categóricos de forma eficaz.

* **Pipeline:**
Todas essas etapas são encapsuladas em um Pipeline, fornecendo uma maneira conveniente e reproduzível de pré-processar os dados para tarefas de aprendizado de máquina.


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, RobustScaler, StringIndexer, OneHotEncoder


# Imputer (estratégia de média para todos os double/numéricos)
to_impute = num_missing_cols
imputer = Imputer(inputCols=to_impute, outputCols=to_impute, strategy='mode')

# Escala numérica
numerical_assembler = VectorAssembler(inputCols=num_cols, outputCol="numerical_assembled")
numerical_scaler = RobustScaler(inputCol="numerical_assembled", outputCol="numerical_scaled")

# String/Cat Indexer (irá codificar missing/null como índice separado)
string_cols_indexed = [c + '_index' for c in string_cols]
string_indexer = StringIndexer(inputCols=string_cols, outputCols=string_cols_indexed, handleInvalid="keep")

# OHE categóricas
ohe_cols = [column + '_ohe' for column in string_cols]
one_hot_encoder = OneHotEncoder(inputCols=string_cols_indexed, outputCols=ohe_cols, handleInvalid="keep")

# Assembler (Todos)
feature_cols = ["numerical_scaled"] + ohe_cols
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Instanciar o pipeline
stages_list = [
    imputer,
    numerical_assembler,
    numerical_scaler,
    string_indexer,
    one_hot_encoder,
    vector_assembler
]

pipeline = Pipeline(stages=stages_list)


## Ajustar o Pipeline

No contexto de aprendizado de máquina e MLflow, **`fitting`** corresponde ao processo de treinamento de um modelo de aprendizado de máquina em um conjunto de dados especificado.

Na etapa anterior, criamos um pipeline. Agora, ajustaremos um modelo com base no pipeline. Este pipeline indexará colunas de string, imputará colunas especificadas, dimensionará colunas numéricas, codificará especificamente colunas em um-hot, e finalmente criará um vetor de todas as colunas de entrada.

In [0]:
pipeline_model = pipeline.fit(train_df)


Em seguida, podemos usar este modelo para transformar, ou aplicar, em qualquer conjunto de dados que desejarmos.

In [0]:
# Transforme training_df e test_df
train_transformed_df = pipeline_model.transform(train_df)
test_transformed_df = pipeline_model.transform(test_df)

In [0]:
train_transformed_df.select("features").show(3, truncate=False)

## Salvar e Reutilizar o Pipeline

Preservar o pipeline de previsão de rotatividade de clientes de telecomunicações, que abrange o modelo, parâmetros e metadados, é fundamental para manter a reprodutibilidade, permitir o controle de versão e facilitar a colaboração entre os membros da equipe. Isso garante um registro detalhado do fluxo de trabalho de aprendizado de máquina. Nesta seção, seguiremos estas etapas;

1. **Salvar o Pipeline:** Salve o modelo de pipeline, incluindo todos os componentes relevantes, no armazenamento de artefatos designado. O pipeline salvo é organizado dentro da pasta **`spark_pipelines`** para maior clareza.

1. **Explorar Estágios do Pipeline Carregado:** Ao carregar o pipeline, inspecione os estágios para revelar as principais transformações e compreender a sequência de operações aplicadas durante a execução do pipeline.



### Salvar o Pipeline

In [0]:
pipeline_model.save(f"{DA.paths.working_dir}/spark_pipelines")

### Carregar e Usar Modelo Salvo

In [0]:
from pyspark.ml import PipelineModel


# Carregar o pipeline
loaded_pipeline = PipelineModel.load(f"{DA.paths.working_dir}/spark_pipelines")

# Mostrar etapas do pipeline
loaded_pipeline.stages

In [0]:
# Vamos usar o pipeline carregado para transformar o conjunto de dados de teste
test_transformed_df = loaded_pipeline.transform(test_df)
display(test_transformed_df)


## Limpar Sala de Aula

Execute a célula a seguir para remover os ativos específicos de aula criados durante esta lição.

In [0]:
DA.cleanup()


## Conclusão

Em resumo, o pipeline de engenharia de recursos apresentado nesta demonstração oferece uma abordagem sistemática e consistente para lidar com o carregamento de dados, imputação e transformação. Ao demonstrar sua aplicação em diferentes conjuntos e enfatizar a importância da preparação de dados, o pipeline se prova uma ferramenta valiosa para tarefas de aprendizado de máquina eficientes e reproduzíveis.

A etapa final de salvar o pipeline no disco garante usabilidade futura, aumentando a eficácia geral do processo de preparação de dados.

&copy; 2024 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>