# Indexando dados no Elasticsearch via Spark

Este notebook usa PySpark para:
1. Ler um arquivo CSV do HDFS
2. Verificar o esquema dos dados
3. Indexar os dados no Elasticsearch usando o conector `elasticsearch-hadoop`

In [1]:
# !pip install pandas

In [2]:
from pyspark.sql import SparkSession

## 1. Criando a sessão do Spark com as configs do Elasticsearch
Aqui configuramos o Spark para saber onde está o Elasticsearch e como se conectar a ele.
Além disso, informamos o endereço do master do cluster Spark.

In [7]:
spark = SparkSession.builder \
    .appName("IndexDatabaseB") \
    .master("spark://barravento:7077") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.1.3") \
    .config("spark.es.nodes", "barravento") \
    .config("spark.es.port", "9200") \
    .config("spark.es.nodes.wan.only", "false") \
    .config("spark.es.resource", "dbb2") \
    .getOrCreate()

## 2. Lendo o arquivo CSV do HDFS
O CSV está na pasta `/sandbox` no HDFS. Usamos `header=True` para ler os nomes das colunas.

In [8]:
df = spark.read.option("header", True).csv("hdfs://barravento:9000/sandbox/02-databaseB.csv")
df.limit(3).toPandas()

Unnamed: 0,id,nome,sexo,mae,dn,cidade,end
0,1,JAMILLE YASMIN DA SILVA SILVA,2,ALAICE DE SENA SILVA,2009-04-22,CARACARAI,COMUNIDADE JABUTI
1,2,GENNILSON DE SOUZA DA SILVA,1,ARLENE DA COSTA BATISTA,2006-11-06,ITABUNA,AV DIQUE 828
2,3,POLLYANNA BARBOSA DE SOUZA,2,MARIAH GRACIRLENE DO NASCIMENTO,2008-02-19,SAO CRISTOVAO,R M


## 3. Visualizando o esquema inferido pelo Spark
Isso ajuda a confirmar que os dados foram lidos corretamente.

In [9]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- mae: string (nullable = true)
 |-- dn: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- end: string (nullable = true)



## 4. Enviando os dados para o Elasticsearch
Aqui salvamos os dados no índice `dbb`. O modo `overwrite` substitui qualquer dado anterior no índice.

In [10]:
df.write \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "dbb2") \
    .mode("overwrite") \
    .save()

## 5. Encerrando a sessão Spark
Importante sempre encerrar para liberar recursos!

In [11]:
spark.stop()