##### Method1 : using  sentiment and length Features 
Imagine a platform that processes a continuous stream of conversations from a 
generative agent (StoryBot). To maintain a safe and engaging environment, you need to monitor these conversations in real time for anomalies such as sudden shifts in sentiment, unusual topic spikes, or atypical user behavior that could indicate emerging issues or opportunities. 
**Task**
-  Develop a streaming machine learning pipeline in Python that ingests data (message 
and metadata), does feature extraction and preprocessing in near real time.
-   applies an anomaly detection model (your own or from a 3rd party), and returns alerts when anomalies are detected. 
- For this task, feel free to pick an anomaly you think would be interesting to track (e.g., changes in mood, change in emoji use, shifts in language, use of the phrase “I don’t know”,  prompt injection attacks, etc.).
- Provide a brief README that outlines the architecture, setup instructions, and how to run tests.   
The datasets include:

**Conversations** - One-on-one conversations between users and an AI agent (StoryBot)
### conversations.json

Contains conversations between individual users and StoryBot. Each conversation includes:

- `messages_list`: List of messages in the conversation
- `ref_conversation_id`: Unique identifier for the conversation
- `ref_user_id`: ID of the user participating in the conversation

Example structure:

```json
{
  "messages_list": [
    {"message": "Good morning! How are you feeling today?",
    "ref_conversation_id": 42615,
    "ref_user_id": 1,
    "transaction_datetime_utc":  "2023-10-01T08:00:00Z",
    "screen_name": "StoryBot"
    },
    {"message": "I'm doing well, thanks for asking! Just trying to get through the day.",
    "ref_conversation_id": 42615,
    "ref_user_id": 822,
    "transaction_datetime_utc":  "2023-10-01T08:01:00Z",
    "screen_name": "User822"
    },
    {"message": "That's great to hear! Is there anything specific on your mind?",
    "ref_conversation_id": 42615,
    "ref_user_id": 1,
    "transaction_datetime_utc":  "2023-10-01T08:02:00Z",
    "screen_name": "StoryBot"
    },
  ],
  "ref_conversation_id": 42615,
  "ref_user_id": 822
}
```

##### Import Required Libraries

In [1]:
#import libarries
import json
import numpy as np
import pandas as pd
from datetime import datetime
import os
import matplotlib.pyplot as plt

##### Read JSON File to Load Conversations

- Define the folder and file paths for the dataset.
- Prepare to load the `conversations.json` file from the `data` directory within the current working directory.


In [2]:
#get folder and filepaths
#
curr_folder = os.getcwd()
conver_json = "conversations.json"
json_file = os.path.join(curr_folder,"data",conver_json)

##### Load Conversations from JSON

- Define a function `read_json()` to load conversation data from the specified JSON file.
    - Handles file not found and JSON decoding errors gracefully.
- Read the raw conversation data into memory.
- Count and print the total number of conversations loaded.


In [None]:
def read_json(file_path):
    """
    Loads conversations from the conversations.json file.
    Args:
        file_path (str): The path to the JSON file.
    Returns:
        list: A list of conversation objects, or None if an error occurs.
    """
    try:
        with open(json_file, 'r') as file:
            data = json.load(file)
            return data
    except FileNotFoundError:
        print(f"Error: File not found: {file_path}")
        return None
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON from the file {file_path}.")
        return None
# read RAW JSON data
raw_conversations = read_json(json_file)
total_conv = len(raw_conversations)
print(f"number of conversations is {total_conv}")


##### Split Data into Training and Test Sets

- Use `train_test_split` from scikit-learn to divide the loaded conversations into training and test sets.
- Set a random seed for reproducibility.
- Define the percentage split between training and testing (default: 80% train, 20% test).
- If the dataset is very small, warn the user and suggest using a K-fold strategy.
- Print the number of conversations in each split.  

**NOTE**: Alternatively, this could all be training set, and we test on a different set of conversations

In [None]:
from sklearn.model_selection import train_test_split # For splitting data
import random

# Assuming StoryBot's ID is 1, based on given json file 
STORYBOT_USER_ID = 1 

RANDOM_SEED = 42 # For reproducible splits
train_per = 80
test_per = 100 - train_per
min_conv_to_train = 8
if total_conv > 0:
    # Too small a data to train
    if total_conv < min_conv_to_train: 
        print("Warning: Very few conversations. Use K-fold strategy to train and test")        
    else:
        train_conversations, test_conversations = train_test_split(
            raw_conversations, 
            test_size= test_per/100,       
            random_state=RANDOM_SEED # For reproducibility
        )

    print(f"\nNumber of conversations for training: {len(train_conversations)}")
    print(f"Number of conversations for testing: {len(test_conversations)}")   
else:
    print("Cannot proceed with splitting as no conversations were loaded.")
    train_conversations, test_conversations = [], [] # Initialize as empty

##### Define Function to Extract User Message Details

- Define `extract_user_message_details()` to:
    - Iterate through each conversation and its messages.
    - Extract details for each message not sent by StoryBot, including:
        - Conversation ID
        - User ID
        - Screen name
        - Timestamp (parsed as datetime)
        - Message text
    - Handle malformed message data gracefully.
- Returns a list of dictionaries, each representing a user message with relevant metadata.


In [5]:
def extract_user_message_details(list_of_conversations, is_storybot_user_id):
    """
    Extracts message details (text, conv_id, timestamp) for all messages for all users
    NOT sent by the is_storybot_user_id.

    Args:
        list_of_conversations (list): A list of conversation objects.
        is_storybot_user_id (int): The user ID of StoryBot.

    Returns:
        list: A list of dictionaries, where each dict contains 
              'conversation_id', 'original_user_id', 'screen_name', 
              'timestamp', and 'message_text' for a user message.
    """
    user_messages_details = []
    # for each conversation with  a user
    for conversation in list_of_conversations:
        # exgtract conversation id
        conv_id = conversation.get('ref_conversation_id', 'unknown_conv_id')
        # extract list of  messages
        messages_list = conversation.get('messages_list', [])
        # build new list with updated datetime
        for message_data in messages_list:
            if not isinstance(message_data, dict): # Basic check for message structure
                # print(f"Skipping malformed message in conv {conv_id}")
                continue

            message_user_id = message_data.get('ref_user_id')
            
            if message_user_id is not None and message_user_id != is_storybot_user_id:
                try:
                    timestamp_str = message_data.get('transaction_datetime_utc')
                    parsed_timestamp = None
                    if timestamp_str:
                        parsed_timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                    
                    user_messages_details.append({
                        'conversation_id': conv_id,
                        'original_user_id': message_user_id,
                        'screen_name': message_data.get('screen_name', 'UnknownUser'),
                        'timestamp': parsed_timestamp,
                        'message_text': message_data.get('message', '') # Ensure text is a string
                    })
                except Exception as e:
                    print(f"Error processing a user message in conv {conv_id}: {e}. Message data: {message_data}")

    return user_messages_details


##### Extract Message Data for Training and Test

- Use the previously defined function to extract user messages from both training and test sets.
- Print the number of user messages extracted for each set.
- Display an example message for sanity checking.
- These extracted messages will be used for embedding and downstream anomaly detection.



In [None]:

# Extract user messages for training
train_user_message_details = []
if train_conversations:
    # create list of all training user data , with updated time
    train_user_message_details = extract_user_message_details(train_conversations, STORYBOT_USER_ID)
    print(f"\nExtracted {len(train_user_message_details)} user messages for training.")
    # print first  user message for sanity check
    if train_user_message_details:
        print(f"Example training user message detail: {train_user_message_details[0]}")

# create list of all test user data , with updated time
test_user_message_details = []
if test_conversations:
    test_user_message_details = extract_user_message_details(test_conversations, STORYBOT_USER_ID)
    print(f"Extracted {len(test_user_message_details)} user messages for testing.")
    if test_user_message_details:
        print(f"Example testing user message detail: {test_user_message_details[0]}")

# exytract  'message_text' from these lists.

# train_user_texts = [detail['message_text'] for detail in train_user_message_details]
# test_user_texts = [detail['message_text'] for detail in test_user_message_details]



unlike in clustering where i needed message data to create embeddings, here
- For feature-based anomaly detection, we use the extracted user message details rather than message embeddings.
- Key features required:
    - `message_text` for sentiment analysis and length calculation.
    - `conversation_id` and `timestamp` to calculate features like sentiment shift within the same conversation.
 **we could also do compariosn with history of sentiments**
 -  **possible issue**: In 281 messages,  lets say first user had all psoiutive messages (5). second user starts  with allnegative (4), . then, when we move from index 4 to 5, wouldnt we flag  sentoment shift as an anomaly, while it shouldn't.
 -  **solution** The groupby('conversation_id') is  ensures that the .diff() operation is applied only to messages within the same conversation. 
    So, it will not calculate a shift between the last message of conversation A and the first message of conversation B. This is good.

##### Feature Engineering: Sentiment and Length

- Use VADER sentiment analyzer to compute sentiment scores for each user message.
- Calculate the length of each message.
- Compute the sentiment shift for each user within a conversation by comparing the sentiment score to the previous message from the same user in the same conversation.
- Fill missing sentiment shift values (first message in a conversation) with zero.
- Return the enhanced message data as a list of dictionaries with new features.

In [None]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd 
analyzer = SentimentIntensityAnalyzer()

def engineer_features_for_messages(message_details_list):
    """
    Adds sentiment score and message length to each message detail.
    Focuses on user messages for sentiment shift.
    """
    if not message_details_list:
        return []

    # Create a DataFrame for easier processing if not already
    df = pd.DataFrame(message_details_list)    

    #Sentiment Score for all messages
    # Ensure 'message_text' is string, handle potential NaN if any step before failed
    df['sentiment_compound'] = df['message_text'].astype(str).apply(lambda text: analyzer.polarity_scores(text)['compound'])

    # 2. Message Length for all messages
    df['message_length'] = df['message_text'].astype(str).apply(len)
    
    # 3. User Sentiment Shift (within each conversation)
    # We only care about this for user messages.
    # Initialize
    df['user_sentiment_shift'] = np.nan 
    
    # Add original_user_id to the df if it's not already there, ensuring it's numeric
    if 'original_user_id' not in df.columns and 'user_id' in df.columns: 
        df['original_user_id'] = df['user_id']
    
    # Ensure original_user_id is numeric, coercing errors
    df['original_user_id'] = pd.to_numeric(df['original_user_id'], errors='coerce')

    
    for conv_id, group in df.groupby('conversation_id'):
        #  Sort these user messages chronologically by their 'timestamp'.
        user_messages_group = group[group['original_user_id'] != STORYBOT_USER_ID].sort_values(by='timestamp')
        if not user_messages_group.empty:
            # Calculate shift from previous user message in the same conversation
            shifts = user_messages_group['sentiment_compound'].diff()
            # Assign these calculated 'shifts' back to the correct rows (user messages)
            #    in the main DataFrame 'df' using their original index.
            df.loc[user_messages_group.index, 'user_sentiment_shift'] = shifts
            
    # Fill NaN shifts (e.g., for the first user message in a convo) with 0 
    df['user_sentiment_shift'].fillna(0, inplace=True)

    return df.to_dict('records') 

# --- Apply feature engineering ---
# Assuming train_user_message_details and test_user_message_details are lists of dicts

# Add features to training user messages
print("\n--- Engineering features for training user messages ---")
train_messages_with_features = engineer_features_for_messages(train_user_message_details)
if train_messages_with_features:
    print(f"Added features to {len(train_messages_with_features)} training user messages.")
    # print(pd.DataFrame(train_messages_with_features).head()) # Optional: view
else:
    print("No training user messages to engineer features for.")

print("\n--- Engineering features for testing user messages ---")
test_messages_with_features = engineer_features_for_messages(test_user_message_details)
if test_messages_with_features:
    print(f"Added features to {len(test_messages_with_features)} testing user messages.")
    # print(pd.DataFrame(test_messages_with_features).head()) # Optional: view
else:
    print("No testing user messages to engineer features for.")

##### Flag Anomalies in Test User Messages

- For each test user message, add anomaly flags:
    - `isAnomalousTooShort`: Set to 1 if the message is shorter than the calculated threshold.
    - `isAnomalousSentimentFlip`: Set to 1 if the user's sentiment flips from positive to negative or vice versa compared to their previous message in the same conversation.
- Track the last sentiment category for each user within each conversation to enable accurate flip detection.
- Store all processed messages and their anomaly flags in a list for further analysis.
- Print the number of processed messages and show an example of the flagged data.

In [None]:

len_thr_shrt = None
# VADER score >= 0.05 is considered positive
sentim_pos_thr = 0.05  
# VADER score <= -0.05 is considered negative
sentim_neg_thr = -0.05 
# Derive len_thr_shrt from training data
if train_messages_with_features:
    df_train_feats = pd.DataFrame(train_messages_with_features)
    #  5th percentile for "too short"
    if 'message_length' in df_train_feats.columns and not df_train_feats['message_length'].empty:
        len_thr_shrt = np.percentile(df_train_feats['message_length'], 2) 
        print(f"Calculated Short Message Length Threshold (len_thr_shrt): {len_thr_shrt:.2f} characters")
    else:
        print("'message_length' not found or empty in training features. Using default for len_thr_shrt.")
        len_thr_shrt = 10 # Default
else:
    print("Training features not available to determine length threshold. Using default for len_thr_shrt.")
    len_thr_shrt = 10 # Default

if len_thr_shrt is None: # Fallback if percentile calculation failed for some reason
    len_thr_shrt = 10 
    print(f"Using default Short Message Length Threshold (len_thr_shrt): {len_thr_shrt}")


print(f"\n--- Preparing to Process Test User Messages for Anomalies (Feature-Based) ---")
print(f"Threshold for 'isAnomalousTooShort': Message length < {len_thr_shrt:.2f}")
print(f"Thresholds for sentiment categories: Positive >= {sentim_pos_thr}, Negative <= {sentim_neg_thr}")
print(f"Criteria for 'isAnomalousSentimentFlip': User sentiment category changes (positive <-> negative) from previous user message.")


In [None]:

# This list will store dictionaries, each representing a test user message with added anomaly flags
test_message_data_anomaly = [] 
# This dictionary will track the last sentiment category of a user within each conversation
conversation_last_user_sentiment_category = {}

if  test_messages_with_features:
    # Convert to DataFrame for easier sorting and iteration
    df_test_feats = pd.DataFrame(test_messages_with_features)    
    # Ensure correct data types and sort for sequential processing of sentiment flips
    df_test_feats['timestamp'] = pd.to_datetime(df_test_feats['timestamp'])
    df_test_feats = df_test_feats.sort_values(by=['conversation_id', 'timestamp'])

    for index, msg_detail in df_test_feats.iterrows():
        # Basic message properties
        current_sentiment_score = msg_detail.get('sentiment_compound', 0)
        message_length = msg_detail.get('message_length', float('inf'))
        conv_id = msg_detail.get('conversation_id')
        
        # Anomaly Flag 1: Message is Too Short 
        isAnomalousTooShort = message_length < len_thr_shrt

        # Anomaly Flag 2: Sentiment Flip (Positive <-> Negative) 
        isAnomalousSentimentFlip = False
        
        # Determine current message's sentiment category
        current_sentiment_category = 'neutral'
        if current_sentiment_score >= sentim_pos_thr:
            current_sentiment_category = 'positive'
        elif current_sentiment_score <= sentim_neg_thr:
            current_sentiment_category = 'negative'

        # Get the last known sentiment category for this user in this conversation
        last_known_category_for_conv = conversation_last_user_sentiment_category.get(conv_id)
         # If there was a previous user message from this user in this conversation
        if last_known_category_for_conv:
            # and the category has changed
            if (last_known_category_for_conv == 'positive' and current_sentiment_category == 'negative') or \
                (last_known_category_for_conv == 'negative' and current_sentiment_category == 'positive'):
                isAnomalousSentimentFlip = True
        
        # Update the last sentiment category for this user in this conversation
        # This assumes msg_detail is a user message (which test_messages_with_features should contain)
        conversation_last_user_sentiment_category[conv_id] = current_sentiment_category
        
        # Append to the list that will form the DataFrame
        test_message_data_anomaly.append({
            'conversation_id': conv_id,
            'original_user_id': msg_detail.get('original_user_id'),
            'message_text': msg_detail.get('message_text'),
            'sentiment_compound': current_sentiment_score,
            'message_length': message_length,
            'user_sentiment_shift': msg_detail.get('user_sentiment_shift', 0.0), # Include this from previous feature engineering
            'current_sentiment_category': current_sentiment_category,
            'previous_user_sentiment_category_in_conv': last_known_category_for_conv if last_known_category_for_conv else 'N/A',
            'isAnomalousTooShort': 1 if isAnomalousTooShort else 0,
            'isAnomalousSentimentFlip': 1 if isAnomalousSentimentFlip else 0
        })
    
    print(f"\nProcessed {len(test_message_data_anomaly)} test user messages and added anomaly flags.")
    if test_message_data_anomaly:
            print("First processed message data structure (example):")
            print(test_message_data_anomaly[0])
else:
    print("The DataFrame created from 'test_messages_with_features' is empty.")



#### Save Anomaly Analysis Results to CSV

- Convert the processed test message data (with anomaly flags) to a DataFrame.
- Create an overall anomaly flag (`isOverallAnomaly`) for each message (set if either anomaly condition is met).
- Export the DataFrame to a CSV file for further analysis or reporting.
- Identify and count test conversations that contain at least one anomalous message.
- Print the number of conversations flagged and confirm the CSV export.


In [None]:
flagged_anomalous_conversations = []
if test_message_data_anomaly and isinstance(test_message_data_anomaly, list):
    # Convert processed message data to DataFrame
    df_test_message_analysis = pd.DataFrame(test_message_data_anomaly)

    if not df_test_message_analysis.empty:
        # Create 'isOverallAnomaly' column (OR logic: message is too short OR has a sentiment flip)        
        df_test_message_analysis['isOverallAnomaly'] = (
            df_test_message_analysis['isAnomalousTooShort'] | df_test_message_analysis['isAnomalousSentimentFlip']
        ).astype(int)

        print(f"Created DataFrame 'df_test_message_analysis' with shape: {df_test_message_analysis.shape}")
        print(f"Number of messages flagged as 'isOverallAnomaly': {df_test_message_analysis['isOverallAnomaly'].sum()}")

        # Export DataFrame to CSV
        csv_file_name = "test_message_anomaly_analysis.csv"
        try:
            df_test_message_analysis.to_csv(csv_file_name, index=False)
            print(f"DataFrame 'df_test_message_analysis' exported to '{csv_file_name}'")
        except Exception as e:
            print(f"Error exporting DataFrame to CSV: {e}")

        # Identify Test Conversations Containing any 'isOverallAnomaly' Messages
        if test_conversations and isinstance(test_conversations, list):
            conv_ids_with_overall_anomaly = set(
                df_test_message_analysis[df_test_message_analysis['isOverallAnomaly'] == 1]['conversation_id'].unique()
            )

            if conv_ids_with_overall_anomaly:
                for conv_obj in test_conversations:
                    if isinstance(conv_obj, dict) and conv_obj.get('ref_conversation_id') in conv_ids_with_overall_anomaly:
                        flagged_anomalous_conversations.append(conv_obj)
                
                print(f"Identified {len(flagged_anomalous_conversations)} test conversations containing overall anomalies.")              
            else:
                print("No conversations in the test set were flagged based on the 'isOverallAnomaly' criterion.")
        else:
            print("Original 'test_conversations' list not found or empty. Cannot identify anomalous conversations.")            
    else:
        print("DataFrame 'df_test_message_analysis' created from 'test_message_data_anomaly' is empty.")
else:
    print("'test_message_data_anomaly' not found, empty, or not a list. Cannot create analysis DataFrame.")