# 🧪 Log Analysis com Elasticsearch + Databricks

In [None]:
# Extração do Elasticsearch
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExtractFromElasticsearch") \
    .config("spark.es.nodes", "localhost") \
    .config("spark.es.port", "9200") \
    .config("spark.es.nodes.wan.only", "true") \
    .getOrCreate()

df = spark.read.format("org.elasticsearch.spark.sql") \
    .load("logs-api/_doc")

df.show(truncate=False)
df.write.mode("overwrite").json("dbfs:/tmp/logs_raw")

In [None]:
# Enriquecimento
from pyspark.sql.functions import when, col
df = spark.read.json("dbfs:/tmp/logs_raw")

df_enriched = df.withColumn(
    "classificacao",
    when(col("message").contains("timeout"), "TimeoutError")
    .when(col("message").contains("null pointer"), "NullPointer")
    .when(col("message").contains("refused"), "ConnectionRefused")
    .otherwise("Outros")
)

df_enriched.show(truncate=False)
df_enriched.write.mode("overwrite").csv("dbfs:/tmp/logs_enriched", header=True)

In [None]:
# Retorno para Elasticsearch
df_final = spark.read.csv("dbfs:/tmp/logs_enriched", header=True)
df_final.write.format("org.elasticsearch.spark.sql") \
    .option("es.resource", "logs-enriched/_doc") \
    .mode("overwrite") \
    .save()