In [1]:
import json
import os
import logging
import pandas as pd
from pymongo import MongoClient
import certifi
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3

from utils.analysis_utils import (
    format_ligne,
    get_elementary_subjects_for_part_of_feedback,
    rename_duplicates
)
from utils.database import update_feedbacks_in_mongo

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Constants
LANGUAGE = 'french'
BRAND_NAME = 'ditp_analysis'
MONGO_SECRET_ID = 'Prod/alloreview'
MONGO_REGION = 'eu-west-3'
MONGO_DATABASE = 'feedbacks_db'
MONGO_COLLECTION = 'feedbacks_Prod'
SAMPLE_SIZE = 30  # Adjust as needed
MODEL_NAME = 'gpt-4o-mini'  # Adjust as needed

def get_mongo_client():
    """
    Establishes a connection to the MongoDB client using credentials from AWS Secrets Manager or environment variables.
    """
    mongo_uri = os.getenv('MONGO_CONNECTION_STRING')
    if not mongo_uri:
        secrets_manager_client = boto3.client("secretsmanager", region_name=MONGO_REGION)
        secrets = json.loads(
            secrets_manager_client.get_secret_value(
                SecretId=MONGO_SECRET_ID
            )["SecretString"]
        )
        password = secrets["mongodb"]["password"]
        mongo_uri = f"mongodb+srv://alloreview:{password}@feedbacksdev.cuwx1.mongodb.net"

    return MongoClient(mongo_uri, tlsCAFile=certifi.where())

mongo_client = get_mongo_client()
collection = mongo_client[MONGO_DATABASE][MONGO_COLLECTION]

def get_feedbacks_to_process(collection, brand_name, sample_size):
    """
    Retrieves feedback documents from MongoDB that need processing.

    :param collection: MongoDB collection object.
    :param brand_name: Name of the brand to filter.
    :param sample_size: Number of documents to sample.
    :return: DataFrame containing feedbacks to process.
    """
    query = {
        '$and': [
            {'brand': brand_name},
            {'extractions': {'$exists': True, '$not': {'$size': 0}}},
            {'splitted_analysis_v2': {'$exists': True, '$not': {'$size': 0}}},
            {'extractions': {'$not': {'$elemMatch': {'elementary_subjects': {'$exists': True}}}}}
        ]
    }

    pipeline = [
        {'$match': query},
        {'$sample': {'size': sample_size}}
    ]

    feedbacks_cursor = collection.aggregate(pipeline)
    feedbacks = list(feedbacks_cursor)
    return pd.DataFrame(feedbacks)

df_feedbacks = get_feedbacks_to_process(collection, BRAND_NAME, SAMPLE_SIZE)
logger.info(f"Number of feedbacks retrieved: {df_feedbacks.shape[0]}")

# Select relevant columns
df_feedbacks = df_feedbacks[[
    '_id', 'ecrit_le', 'splitted_analysis_v2', 'extractions',
    'intitule_structure_1', 'intitule_structure_2', 'tags_metiers', 'pays', 'verbatims'
]]

# Generate brand_context column
df_feedbacks['brand_context'] = df_feedbacks.apply(format_ligne, axis=1)

# Prepare DataFrame for processing
df_to_process = df_feedbacks[['_id', 'extractions', 'brand_context']]
logger.info(f"Number of rows pending elementary_subjects processing: {df_to_process.shape[0]}")

EndpointConnectionError: Could not connect to the endpoint URL: "https://oidc.us-east-1.amazonaws.com/token"

In [25]:

# Explode 'extractions' to have one extraction per row
df_extractions = df_to_process.explode('extractions').reset_index(drop=True)

def process_extraction_row(row: pd.Series) -> pd.Series:
    """
    Processes a single extraction row to add elementary_subjects.

    :param row: Pandas Series representing a row with '_id', 'extractions', and 'brand_context'.
    :return: Updated row with 'extractions' and 'elementary_subjects' fields.
    """
    try:
        extraction = row['extractions']
        extraction, elementary_subjects = get_elementary_subjects_for_part_of_feedback(
            extractions=extraction,
            language=LANGUAGE,
            brand_name=BRAND_NAME,
            brand_context=row['brand_context'],
            model=MODEL_NAME,
            should_update_mongo=False
        )
        row['extractions'] = extraction
        row['elementary_subjects'] = elementary_subjects
    except Exception as e:
        logger.error(f"Error processing extraction for _id {row['_id']}: {e}")
    return row

def process_extractions_in_parallel(df: pd.DataFrame, func, max_workers=10) -> pd.DataFrame:
    """
    Processes the extractions DataFrame in parallel using threads.

    :param df: DataFrame containing extractions to process.
    :param func: Function to apply to each row.
    :param max_workers: Maximum number of worker threads.
    :return: DataFrame with processed extractions.
    """
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(func, row): idx for idx, row in df.iterrows()}
        for future in as_completed(futures):
            idx = futures[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                logger.error(f"Error processing row at index {idx}: {e}")
    processed_df = pd.DataFrame(results)
    return processed_df

In [26]:
# Process extractions in parallel
df_processed_extractions = process_extractions_in_parallel(df_extractions, process_extraction_row)

# Group by '_id' and aggregate 'extractions' into lists
df_grouped = df_processed_extractions.groupby('_id').agg({
    'extractions': list
}).reset_index()

INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HT

KeyboardInterrupt: 

In [28]:
df_grouped['extractions']

0    [{'sentiment': 'NEGATIVE', 'extraction': 'Serv...
1    [{'sentiment': 'NEGATIVE', 'extraction': 'Vict...
2    [{'sentiment': 'NEGATIVE', 'extraction': 'Avis...
3    [{'sentiment': 'SUGGESTION', 'extraction': 'Re...
4    [{'sentiment': 'NEGATIVE', 'extraction': 'Sent...
Name: extractions, dtype: object

In [None]:
# Apply rename_duplicates to each row's 'extractions' list
def apply_rename_duplicates(extractions):
    try:
        return rename_duplicates(extractions)
    except Exception as e:
        logger.error(f"Error in rename_duplicates: {e}")
        return extractions

df_grouped['extractions'] = df_grouped['extractions'].apply(apply_rename_duplicates)

INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"


{"État Civil : Problèmes d'Organisation et de Suivi": ['Service : État Civil : Mauvaise Organisation et Suivi', "État Civil : Problèmes d'Organisation et de Suivi", "État Civil : Blocage des Services et Manque d'Organisation"], 'État Civil : Difficulté de Prise de Rendez-vous': ['État Civil : Difficulté de Prise de Rendez-vous']}


INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"


{'Achat de Véhicule : Problème de Vol': ['Achat de Véhicule : Problème de Vol'], 'Unknown Topic': ['Unknown Topic'], 'Service : Personnel : Appréciation du Soutien aux Victimes': ['Service : Personnel : Appréciation du Soutien aux Victimes'], 'Reconnaissance : Appréciation du Service de Gendarmerie': ['Reconnaissance : Appréciation du Service de Gendarmerie']}


INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"


{'Service : Personnel : Satisfaction Client': ['Service : Personnel : Satisfaction Client'], 'Unknown Topic': ['Unknown Topic']}


INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"


{"Application TousAntiCovid : Problème d'Entrée de Certificat de Vaccination": ["Application TousAntiCovid : Problème d'Entrée de Certificat de Vaccination"]}


INFO:httpx:HTTP Request: POST https://llm-api.allobrain.com/chat/completions "HTTP/1.1 200 OK"


{'Communication : Absence de Réponse': ['Communication : Absence de Réponse de la Sous-Préfecture', 'Communication : Absence de Réponse aux Mails et Appels'], 'Accessibilité : Manque de Réponse et Gestion Inappropriée des Demandes': ['Accessibilité : Manque de Réponse et Gestion Inappropriée des Demandes'], 'Fonctionnement Interne : Manque de Réponse et Responsabilités Inappropriées': ['Fonctionnement Interne : Manque de Réponse et Responsabilités Inappropriées'], 'Unknown Topic': ['Unknown Topic']}


In [21]:
df_grouped

Unnamed: 0,_id,extractions
0,ditp_analysis/1149420,"[{'sentiment': 'NEGATIVE', 'extraction': 'Serv..."
1,ditp_analysis/3269839,"[{'sentiment': 'NEGATIVE', 'extraction': 'Vict..."
2,ditp_analysis/4058894,"[{'sentiment': 'NEGATIVE', 'extraction': 'Avis..."
3,ditp_analysis/4120625,"[{'sentiment': 'SUGGESTION', 'extraction': 'Re..."
4,ditp_analysis/4759856,"[{'sentiment': 'NEGATIVE', 'extraction': 'Sent..."


In [None]:
from utils.database import (get_elementary_subjects)

all_elementary_subjects = get_elementary_subjects('ditp_analysis', 'negative')
# Append dès que y a des nouveaux

# Refaire un call avec 


# For every feedbacks, we retrieve all the new subjects by sentiment. And we rename duplicate compared to the mongo database.
# get_elementary_subjects(brand, type) - function to get all the elementary subjects in the mongo database (by sentiment type)

In [23]:
from collections import defaultdict

def group_subjects_by_sentiment(df):
    """
    Groups elementary subjects by their sentiment from a dataframe of extractions.
    
    Parameters:
    df (DataFrame): A DataFrame containing extractions with sentiments and subjects.
    
    Returns:
    dict: A dictionary where the keys are sentiments, and the values are lists of associated subjects.
    """
    # Initialize a defaultdict to hold lists of subjects for each sentiment
    subjects_by_sentiment = defaultdict(set)

    # Iterate over the extractions in your DataFrame
    for extractions in df['extractions']:
        for item in extractions:
            if item['is_new_topic']:
                sentiment = item['sentiment']
                subjects = item['elementary_subjects']
                # Add subjects to the set corresponding to the sentiment
                subjects_by_sentiment[sentiment].update(subjects)

    # Convert sets to lists
    subjects_by_sentiment = {sentiment: list(subjects) for sentiment, subjects in subjects_by_sentiment.items()}

    return subjects_by_sentiment

# Example usage
result = group_subjects_by_sentiment(df_grouped)
print(result)


{'NEGATIVE': ['Accessibilité : Manque de Réponse et Gestion Inappropriée des Demandes', "Application TousAntiCovid : Problème d'Entrée de Certificat de Vaccination", 'Achat de Véhicule : Problème de Vol', "État Civil : Problèmes d'Organisation et de Suivi", 'Fonctionnement Interne : Manque de Réponse et Responsabilités Inappropriées', 'État Civil : Difficulté de Prise de Rendez-vous', 'Communication : Absence de Réponse'], 'POSITIVE': ['Service : Personnel : Appréciation du Soutien aux Victimes', 'Service : Personnel : Satisfaction Client', 'Reconnaissance : Appréciation du Service de Gendarmerie'], 'SUGGESTION': ["Application TousAntiCovid : Problème d'Entrée de Certificat de Vaccination"]}


In [None]:
def bulk_update_elementary_subjects_from_dataframe(df, update_function):
    pass

In [None]:
# Update feedbacks in MongoDB
def bulk_update_feedbacks_from_dataframe(df, update_function):
    """
    Prepares and performs bulk update of feedback documents in MongoDB.

    :param df: DataFrame containing '_id' and 'extractions' columns.
    :param update_function: Function to perform the update in MongoDB.
    """
    feedbacks_to_update = [
        {
            "id": row['_id'],
            "updates": {
                "extractions": row['extractions'],
                # Include other fields to update if necessary
            }
        }
        for _, row in df.iterrows()
    ]
    # Call the update function with the list of feedback updates
    update_function(feedbacks_to_update)

bulk_update_feedbacks_from_dataframe(df_grouped, update_feedbacks_in_mongo)