# Conexão Spark & MongoDB Atlas

In [0]:
connectionString=''
database=""
collection="chats"

# URL e API do Chatbot
url = "" 
api_key = ""  

In [0]:
df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection","chats").load()

In [0]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- id: string (nullable = true)
 |-- last_message: string (nullable = true)
 |-- questions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ask_more_information: boolean (nullable = true)
 |    |    |-- call_status: boolean (nullable = true)
 |    |    |-- cancellation: boolean (nullable = true)
 |    |    |-- data_store_id: string (nullable = true)
 |    |    |-- doubt: boolean (nullable = true)
 |    |    |-- hack_fraud: boolean (nullable = true)
 |    |    |-- human_assistance: boolean (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- instability: boolean (nullable = true)
 |    |    |-- jailbreak: boolean (nullable = true)
 |    |    |-- metadata: struct (nullable = true)
 |    |    |    |-- ask_more_information: boolean (nullable = true)
 |    |    |    |-- data_store_id: string (nullable = true)
 |    |    |    |-- dict_intent: struct

# Consulta 1 - Identificar e analisar a relevância dos artigos que ajudam a responder questões em conversas relacionadas a problemas com notas fiscais.

In [0]:
from pyspark.sql.functions import explode, col, avg, row_number, expr
from pyspark.sql.window import Window


df_expanded_questions = df.withColumn("question", explode("questions"))

# Filtra as perguntas que contêm 'nota fiscal', 'nfe', ou 'nf-e'
df_filtered = df_expanded_questions.filter(
    (col("question.question").contains("nota fiscal")) | 
    (col("question.question").contains("nfe")) | 
    (col("question.question").contains("nf-e"))
)

# Expande o array de source_documents dentro de cada question
df_expanded = df_filtered.withColumn("document", explode("question.metadata.source_documents")) \
    .select(
        col("question.id").alias("question_id"),
        col("document.kwargs.metadata.relevance_score").alias("relevance_score"),
        col("document.kwargs.metadata.id").alias("document_id")
    )

# Adiciona uma coluna para índice de documentos
window_spec = Window.partitionBy("question_id").orderBy("document_id")
df_with_index = df_expanded.withColumn("doc_index", row_number().over(window_spec))

# Calcula a média dos scores de relevância para cada índice de documento
average_scores = df_with_index.groupBy("doc_index").agg(avg("relevance_score").alias("avg_relevance_score")) \
    .orderBy("doc_index")

average_scores.show()


+---------+-------------------+
|doc_index|avg_relevance_score|
+---------+-------------------+
|        1| 0.3930434769262438|
|        2| 0.3423913056435792|
|        3|0.35782608551823575|
|        4|0.36543478201264923|
|        5| 0.3376086932485518|
+---------+-------------------+



Na consulta acima, nós temos a média de relevância dos artigos recuperamos para responder perguntas que envolvem nota fiscal. Podemos ver que os artigos recuperados possuem uma relevância baixa no contexto geral.

# Consulta 2 - Avaliar a performance mensal do chatbot, calculando a porcentagem de casos em que o chatbot resolveu problemas sozinho em comparação com os casos que necessitaram de intervenção humana.

In [0]:
from pyspark.sql.functions import explode, col, month, year, sum as spark_sum, count, when, round

# Expande o array de questions
df_expanded = df.withColumn("question", explode("questions"))

# Extraindo o mês e ano da data da pergunta
df_expanded = df_expanded.withColumn("month", month(col("question.question_date"))) \
    .withColumn("year", year(col("question.question_date")))

# Criando colunas para identificar intervenção humana e resolução pelo chatbot
df_expanded = df_expanded.withColumn("human_intervention", when(col("question.metadata.dict_intent.human_assistance") == True, 1).otherwise(0)) \
    .withColumn("chatbot_resolved", when(col("question.metadata.dict_intent.human_assistance") == False, 1).otherwise(0))

# Removendo dados nulos
df_filtered = df_expanded.filter(col("month").isNotNull() & col("year").isNotNull() & col("question.metadata.dict_intent.human_assistance").isNotNull())

# Agrupando por mês e ano e calculando a performance
df_performance = df_filtered.groupBy("year", "month") \
    .agg(
        spark_sum("human_intervention").alias("total_human_intervention"),
        spark_sum("chatbot_resolved").alias("total_chatbot_resolved"),
        count("*").alias("total_cases")
    ) \
    .withColumn("chatbot_performance", round((col("total_chatbot_resolved") / col("total_cases")) * 100, 1)) \
    .orderBy("year", "month")

# Exibindo o resultado
df_performance.show()


+----+-----+------------------------+----------------------+-----------+-------------------+
|year|month|total_human_intervention|total_chatbot_resolved|total_cases|chatbot_performance|
+----+-----+------------------------+----------------------+-----------+-------------------+
|2024|    7|                     954|                  2371|       3325|               71.3|
|2024|    8|                       0|                    11|         11|              100.0|
|2024|    9|                       1|                     4|          5|               80.0|
+----+-----+------------------------+----------------------+-----------+-------------------+



Na consulta acima, analisamos a assistência humana necessária durante as interações com o chatbot, distinguindo entre casos onde a intervenção foi necessária e aqueles em que o chatbot conseguiu resolver o problema sozinho. A partir dessas informações, calculamos a performance do chatbot em termos de porcentagem de casos resolvidos sem intervenção humana.

Além disso, para facilitar a análise, extraímos e agrupamos os dados por mês e ano, permitindo uma visão detalhada da performance do chatbot ao longo do tempo.

Abaixo faremos a escrita dos dados retornados da consulta na collection `analises_mensais`:

In [0]:
# Define as opções para a nova coleção no MongoDB
connectionString=''
database_name = ''
collection_name = 'analises_mensais'

# Escreve o DataFrame df_performance na nova coleção no MongoDB
df_performance.write \
    .format("mongo") \
    .mode("overwrite") \
    .option("database", database_name) \
    .option("collection", collection_name) \
    .option("spark.mongodb.output.uri", connectionString) \
    .save()


# Escrita no MongoDB via Spark


O que iremos fazer é:

1. Definir uma pergunta
2. Criar uma thread-id
3. Enviar a pergunta para a API
4. Escrever a thread id e a resposta da API no MongoDB

In [0]:
import requests
import json
import uuid
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

question = "Quero falar com atendente"

spark = SparkSession.builder \
    .appName("Write Nested Data to MongoDB") \
    .getOrCreate()

def create_thread_id():
    return f"thread_{uuid.uuid4().hex[:8]}"

# Criação de thread_id aleatória
thread_id = create_thread_id()
print("thread id:", thread_id)

# Definição dos cabeçalhos e payload
headers = {
    "Content-Type": "application/json",
    "x-api-key": api_key
}

payload = {
    "contactIdentity": "00_teste",
    "camiThreadId": thread_id,
    "question": question
}

# Envio da requisição POST
response = requests.post(url, headers=headers, json=payload)

if response.status_code == 200:
    print("Requisição bem-sucedida!")
    data = response.json()

    # Dados adicionais com a resposta
    formatted_data = {
        "id": thread_id,
        "user": {
            "user_type_blip": "contador",
            "name": "",
            "user_id": "351928129240@wa.gw.msging.net",
            "tenant_id": "1729226"
        },
        "questions": [{
            "question": data.get("question", ""),
            "question_date": data.get("time", ""),
            "response": data.get("response", ""),
            "response_date": data.get("time", ""),
            "response_time": data.get("time", ""),
            "metadata": {
                "type_doubt": data.get("type_doubt", ""),
                "dict_intent": {
                    "call_status": data.get("call_status", False),
                    "cancellation": data.get("cancellation", False),
                    "doubt": data.get("doubt", False),
                    "hack_fraud": data.get("hack_fraud", False),
                    "human_assistance": data.get("human_assistance", False),
                    "instability": data.get("instability", False),
                    "jailbreak": data.get("jailbreak", False),
                    "negotiation": data.get("negotiation", False),
                    "toxicity": data.get("toxicity", False)
                },
                "source_documents": [
                    {
                        "type": doc.get("type", "constructor"),
                        "id": doc.get("id", []),
                        "lc": doc.get("lc", 1),
                        "kwargs": {
                            "metadata": {
                                "id": doc.get("kwargs", {}).get("metadata", {}).get("id", ""),
                                "relevance_score": doc.get("kwargs", {}).get("metadata", {}).get("relevance_score", 0.0),
                                "source": doc.get("kwargs", {}).get("metadata", {}).get("source", "")
                            },
                            "page_content": doc.get("kwargs", {}).get("page_content", "")
                        }
                    } for doc in data.get("source_documents", [])
                ],
                "data_store_id": data.get("data_store_id", "")
            }
        }]
    }

    # Convertendo para JSON e depois para DataFrame
    json_str = json.dumps([formatted_data])
    rdd = spark.sparkContext.parallelize([json_str])
    df = spark.read.json(rdd)

    # Adicionando timestamp
    df = df.withColumn("last_message", current_timestamp())

    # Mostrar o DataFrame para verificação
    df.show(truncate=False)

    # Escrever no MongoDB (ajustar configurações)
    df.write.format("mongo") \
        .option("spark.mongodb.output.uri", connectionString) \
        .option("database", database) \
        .option("collection", collection) \
        .mode("append") \
        .save()

    print("Dados inseridos no MongoDB com sucesso.")
else:
    print("Erro na requisição:", response.status_code)
    print("Mensagem de erro:", response.text)

    # Capturar o timestamp como string
    error_timestamp = datetime.now().isoformat()

    # Formatar dados de erro
    error_log = {
        "id": thread_id,
        "error_message": response.text,
        "status_code": response.status_code,
        "timestamp": error_timestamp
    }

    # Convertendo para JSON e depois para DataFrame
    error_json_str = json.dumps([error_log])
    error_rdd = spark.sparkContext.parallelize([error_json_str])
    error_df = spark.read.json(error_rdd)

    # Escrever no MongoDB (coleção de logs)
    error_df.write.format("mongo") \
        .option("spark.mongodb.output.uri", connectionString) \
        .option("database", database) \
        .option("collection", "error_logs") \
        .mode("append") \
        .save()

    print("Log de erro inserido no MongoDB com sucesso.")

thread id: thread_704baca3
Requisição bem-sucedida!
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------+-----------------------+
|id             |questions                                                                                                                                                                            |user                                                |last_message           |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------+-----------------------+
|thread_704baca3|[{{supercami_search_app_v1, {false, false, false, false, true, false, false, false, false}, [], dono

# Leitura no MongoDB

Por fim, iremos recuperar a resposta da IA a partir do thread id em que foi feita a pergunta.

In [0]:
df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection",collection).load()

# Filtrar os dados pelo thread_id específico
filtered_df = df.filter(df["id"] == thread_id)

response_df = filtered_df.withColumn("response", explode(col("questions")))

# Selecionar e mostrar somente a resposta
final_df = response_df.select("response.response")

final_df.show(truncate=False)

+--------+
|response|
+--------+
|        |
+--------+

