In [1]:
import re
import pandas as pd
import codecs
import chardet
from typing import Dict, List, Optional

from pysentimiento import create_analyzer
from pysentimiento.preprocessing import preprocess_tweet

import unicodedata

import warnings

In [2]:
class TextFileProcessor:
    """
    Class to read text files and process messages.

    Attributes:
        file_path (str): The path to the text file.

    Methods:
        read_file() -> Optional[str]: Reads the text file and returns its content as a string.
        process_messages(txt_file: str) -> pd.DataFrame: Processes the text file content and returns a DataFrame with messages.
        filter_users_by_message_count(df: pd.DataFrame, min_message_count: int) -> pd.DataFrame: Filters users based on the minimum message count.
    """

    def __init__(self, file_path: str) -> None:
        """
        Initializes a new instance of the TextFileProcessor class.

        Args:
            file_path (str): The path to the text file.
        """
        self.file_path = file_path

    @staticmethod
    def decode_unicode_escape(s: str) -> str:
        """
        Decodes Unicode escape sequences in a string.

        Args:
            s (str): The string with Unicode escape sequences.

        Returns:
            str: The decoded string.
        """
        s = s.encode('utf-8').decode('utf-8')
        return codecs.decode(s, 'unicode_escape')

    def read_file(self) -> Optional[str]:
        """
        Reads the text file and returns its content as a string.

        Returns:
            Optional[str]: The content of the text file as a string, or None if there is an error.

        Raises:
            FileNotFoundError: If the file is not found.
            IOError: If there is an error reading the file.
        """
        try:
            # Suprimir solo el aviso específico
            warnings.filterwarnings("ignore", category=DeprecationWarning)

            with open(self.file_path, 'r', encoding='utf-8') as file:
                content = file.read()
                content = content.encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
                content = unicodedata.normalize('NFKC', content)
                return content
        except FileNotFoundError as e:
            print(f"Error: File not found. {e}")
            return None
        except IOError as e:
            print(f"Error reading file. {e}")
            return None

    @staticmethod
    def process_messages(txt_file: str) -> pd.DataFrame:
        """
        Processes the text file content and returns a DataFrame with messages.

        Args:
            txt_file (str): The content of the text file as a string.

        Returns:
            pd.DataFrame: DataFrame with columns "DateHour", "User", and "Message".
        """
        # Regex patterns
        patterns = {
            'pattern_1': r"(\d{1,2}/\d{1,2}/\d{2,4}), (\d{1,2}:\d{2}) (AM|PM) - ([^:]+): (.*)",
            'pattern_2': r"\[(\d{2}\.\d{2}\.\d{2}), (\d{2}:\d{2}:\d{2})\] ([^:]+): (.*)",
            'pattern_3': r"\[(\d{1,2}/\d{1,2}/\d{2,4}), (\d{1,2}:\d{2}:\d{2}) (AM|PM)\] ([^:]+): (.*)",
            'pattern_4': r"(\d{2}/\d{2}/\d{2}), (\d{1,2}:\d{2}) ([ap]\. m\.) - ([^:]+): (.*)",
            'pattern_5': r'\[(\d{1,2}/\d{1,2}/\d{2,4}),\s(\d{1,2}:\d{2}:\d{2})\s([ap]\.?\s[m]\.?)\]\s([^\:]+):\s(.+)'
        }

        matches = {}
        for key, pattern in patterns.items():
            matches[key] = re.findall(pattern, txt_file)

        df_messages = pd.DataFrame()
        
        if matches['pattern_1']:
            df_messages = pd.DataFrame(matches['pattern_1'], columns=["Date", "Hour", "AM/PM", "User", "Message"])
            df_messages['DateHour'] = pd.to_datetime(df_messages['Date'] + ' ' + df_messages['Hour'] + ' ' + df_messages['AM/PM'], format='%m/%d/%y %I:%M %p')
            df_messages = df_messages[["DateHour", "User", "Message"]]
        elif matches['pattern_2']:
            df_messages = pd.DataFrame(matches['pattern_2'], columns=["Date", "Hour", "User", "Message"])
            df_messages['DateHour'] = pd.to_datetime(df_messages['Date'] + ' ' + df_messages['Hour'], format='%d.%m.%y %H:%M:%S')
            df_messages = df_messages[["DateHour", "User", "Message"]]
        elif matches['pattern_3']:
            df_messages = pd.DataFrame(matches['pattern_3'], columns=["Date", "Hour", "AM/PM", "User", "Message"])
            df_messages['DateHour'] = pd.to_datetime(df_messages['Date'] + ' ' + df_messages['Hour'] + ' ' + df_messages['AM/PM'], format='%m/%d/%y %I:%M:%S %p')
            df_messages = df_messages[["DateHour", "User", "Message"]]
        elif matches['pattern_4']:
            df_messages = pd.DataFrame(matches['pattern_4'], columns=["Date", "Hour", "AM/PM", "User", "Message"])
            df_messages['AM/PM'] = df_messages['AM/PM'].replace({'a. m.': 'AM', 'p. m.': 'PM'})
            df_messages['DateHour'] = pd.to_datetime(df_messages['Date'] + ' ' + df_messages['Hour'] + ' ' + df_messages['AM/PM'], format='%d/%m/%y %I:%M %p')
            df_messages = df_messages[["DateHour", "User", "Message"]]
        elif matches['pattern_5']:
            df_messages = pd.DataFrame(matches['pattern_5'], columns=["Date", "Hour", "AM/PM", "User", "Message"])
            df_messages['AM/PM'] = df_messages['AM/PM'].replace({'a. m.': 'AM', 'p. m.': 'PM'})
            df_messages['DateHour'] = pd.to_datetime(df_messages['Date'] + ' ' + df_messages['Hour'] + ' ' + df_messages['AM/PM'], format='%m/%d/%y %I:%M:%S %p')
            df_messages = df_messages[["DateHour", "User", "Message"]]
        else:
            print("No matches found.")
            df_messages = pd.DataFrame(columns=["DateHour", "User", "Message"])

        return df_messages

    @staticmethod
    def filter_users_by_message_count(df: pd.DataFrame, min_message_count: int) -> pd.DataFrame:
        """
        Filters users based on the minimum message count.

        Args:
            df (pd.DataFrame): DataFrame with columns 'DateHour', 'User', and 'Message'.
            min_message_count (int): Minimum number of messages a user must have to be included in the resulting DataFrame.

        Returns:
            pd.DataFrame: Filtered DataFrame with only users that have at least min_message_count messages.
        """
        df_copy = df.copy()
        user_message_counts = df_copy['User'].value_counts()
        users_to_keep = user_message_counts[user_message_counts >= min_message_count].index
        filtered_df = df_copy[df_copy['User'].isin(users_to_keep)].reset_index(drop=True)
        return filtered_df


In [7]:
processor = TextFileProcessor(r'C:\Users\Admin\Documents\whatsapp-analyser\data\Chats\WhatsApp Chat - Netflix  - Iphone (Spa)\_chat.txt')

file_content = processor.read_file()

if file_content:
    df_messages = processor.process_messages(file_content)
    filtered_df = processor.filter_users_by_message_count(df_messages, 10)

In [10]:
filtered_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 52468 entries, 0 to 52467
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   DateHour  52468 non-null  datetime64[ns]
 1   User      52468 non-null  object        
 2   Message   52468 non-null  object        
dtypes: datetime64[ns](1), object(2)
memory usage: 1.2+ MB


In [12]:
class TextAnalyzer:
    """
    Class for analyzing text using different analysis tasks.

    Attributes:
        analyzer: Instance of the analyzer created with the specified task and language.
        task (str): The specific task of the analysis (e.g., 'sentiment', 'emotion', etc.).
    """
    def __init__(self, task: str, lang: str = "es") -> None:
        """
        Initializes the TextAnalyzer with a specific task and language.

        Args:
            task (str): The specific task of the analysis.
            lang (str): The language of the analysis (default is "es" for Spanish).
        """
        self.analyzer = create_analyzer(task=task, lang=lang)
        self.task = task

    def predict(self, text: str) -> Dict[str, float]:
        """
        Performs the prediction on the provided text and returns the results.

        Args:
            text (str): The text to analyze.

        Returns:
            Dict[str, float]: A dictionary with the probabilities and the maximum result.
        """
        result = self.analyzer.predict(text)
        result_dict = result.probas
        max_key = f"MAX_{self.task.upper()}"
        result_dict[max_key] = result.output
        return result_dict

class CombinedAnalyzer:
    """
    Class to combine sentiment and emotion analyses.

    Attributes:
        sentiment_analyzer: Instance of TextAnalyzer for sentiment analysis.
        emotion_analyzer: Instance of TextAnalyzer for emotion analysis.
        return_mode (str): Specifies which analysis results to return ('both', 'sentiment', 'emotion').
    """
    def __init__(self, return_mode: str = 'both') -> None:
        """
        Initializes the analyzers for sentiment and emotion.

        Args:
            return_mode (str): Specifies which analysis results to return ('both', 'sentiment', 'emotion').
                               Default is 'both'.
        """
        self.sentiment_analyzer = TextAnalyzer(task="sentiment")
        self.emotion_analyzer = TextAnalyzer(task="emotion")
        self.return_mode = return_mode

    def combine_sentiments(self, text: str) -> Dict[str, float]:
        """
        Combines the results of sentiment and emotion analyses based on the return_mode.

        Args:
            text (str): The text to analyze.

        Returns:
            Dict[str, float]: A combined dictionary with the results of the analyses.
        """
        sentiment_dict = self.sentiment_analyzer.predict(text)
        emotion_dict = self.emotion_analyzer.predict(text)
        
        if self.return_mode == 'both':
            combined_dict = {**sentiment_dict, **emotion_dict}
        elif self.return_mode == 'sentiment':
            combined_dict = sentiment_dict
        elif self.return_mode == 'emotion':
            combined_dict = emotion_dict
        else:
            raise ValueError("Invalid return_mode. Choose 'both', 'sentiment', or 'emotion'.")
        
        return combined_dict

    def analyze_dataframe(self, df: pd.DataFrame, text_column: str) -> pd.DataFrame:
        """
        Analyzes a DataFrame of texts, applying sentiment and/or emotion analysis to each text.

        Args:
            df (pd.DataFrame): The DataFrame containing the texts to analyze.
            text_column (str): The name of the column containing the texts.

        Returns:
            pd.DataFrame: The original DataFrame with added columns for analysis results.
        """
        df[text_column] = df[text_column].apply(preprocess_tweet, lang="es")
        analysis_results = df[text_column].apply(self.combine_sentiments)
        results_df = pd.json_normalize(analysis_results)
        return pd.concat([df, results_df], axis=1)

In [13]:
analyzer = CombinedAnalyzer(return_mode = "sentiment")
sample_results_sentiments = analyzer.analyze_dataframe(filtered_df, "Message")



In [None]:
class TextAnalyzer:
    def __init__(self, task: str, lang: str = "es") -> None:
        self.analyzer = create_analyzer(task=task, lang=lang)
        self.task = task

    def predict(self, text: str) -> Dict[str, float]:
        result = self.analyzer.predict(text)
        result_dict = result.probas
        max_key = f"MAX_{self.task.upper()}"
        result_dict[max_key] = result.output
        return result_dict

class CombinedAnalyzer:
    def __init__(self, return_mode: str = 'both') -> None:
        self.sentiment_analyzer = TextAnalyzer(task="sentiment")
        self.emotion_analyzer = TextAnalyzer(task="emotion")
        self.return_mode = return_mode

    def combine_sentiments(self, text: str) -> Dict[str, float]:
        sentiment_dict = self.sentiment_analyzer.predict(text)
        emotion_dict = self.emotion_analyzer.predict(text)
        
        if self.return_mode == 'both':
            combined_dict = {**sentiment_dict, **emotion_dict}
        elif self.return_mode == 'sentiment':
            combined_dict = sentiment_dict
        elif self.return_mode == 'emotion':
            combined_dict = emotion_dict
        else:
            raise ValueError("Invalid return_mode. Choose 'both', 'sentiment', or 'emotion'.")
        
        return combined_dict

    def analyze_dataframe(self, df: pd.DataFrame, text_column: str) -> pd.DataFrame:

        preprocessed_texts = df[text_column].apply(lambda x: preprocess_tweet(x, lang="es")).tolist()
        analysis_results = [self.combine_sentiments(text) for text in preprocessed_texts]
        results_df = pd.json_normalize(analysis_results)
        return pd.concat([df, results_df], axis=1)


In [None]:
analyzer = CombinedAnalyzer(return_mode='both')
analyzed_df = analyzer.analyze_dataframe(filtered_df, 'Message')



In [None]:
analyzed_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 15 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   DateHour       100 non-null    datetime64[ns]
 1   User           100 non-null    object        
 2   Message        100 non-null    object        
 3   NEG            100 non-null    float64       
 4   NEU            100 non-null    float64       
 5   POS            100 non-null    float64       
 6   MAX_SENTIMENT  100 non-null    object        
 7   others         100 non-null    float64       
 8   joy            100 non-null    float64       
 9   sadness        100 non-null    float64       
 10  anger          100 non-null    float64       
 11  surprise       100 non-null    float64       
 12  disgust        100 non-null    float64       
 13  fear           100 non-null    float64       
 14  MAX_EMOTION    100 non-null    object        
dtypes: datetime64[ns](1), fl