# SpaPhish Message Corpus

Date: 2024.07.20

Authors: 
- Vitali Herrera Semenets
- Lazaro Bustio Martínez

## Instaling required packages and libraries

In [33]:
#%pip install tabulate
#%pip install pandas
#%pip install numpy
#%pip install matplotlib
#%pip install extract-msg
#%pip install openpyxl
#%pip install beautifulsoup4
#%pip install chardet
#%pip install lxml
#%pip install groq
#%pip install tenacity
#%pip install langdetect

## Importing required libraries

In [34]:
# Standard library imports
import os  # Interact with the operating system (e.g., file paths, directories)
import hashlib  # Secure hash algorithms (e.g., SHA-256) for computing file hashes
import time  # Time-related utilities (e.g., delays, timestamps)
import logging  # Logging module for tracking events and debugging
import email  # Email handling utilities
from email import policy  # Define email parsing and formatting policies
from email.parser import BytesParser  # Parse email messages from byte streams
from email.header import decode_header  # Decode encoded email headers
from email.utils import parseaddr  # Parse email addresses into local and domain parts
import re  # Regular expressions for string pattern matching
import string  # String manipulation and character set handling

# Third-party library imports
import tenacity  # Retry logic to handle transient failures (e.g., API rate limits)
from tabulate import tabulate  # Create well-formatted tables from lists or DataFrames
import pandas as pd  # Data manipulation and analysis (DataFrame support)
from bs4 import BeautifulSoup  # Parse and extract data from HTML and XML documents

# Specific imports for email handling
import extract_msg  # Parse and extract contents from Microsoft Outlook MSG files

# Utility imports
from pathlib import Path  # Object-oriented file system paths management
from collections import defaultdict

# Additional imports for concurrency and request tracking
from datetime import datetime  # Date and time operations for tracking API limits

import groq

# Data structures

In [35]:
class Node:
    def __init__(self, data):
        self.data = data
        self.next = None
        self.prev = None
    
    def info(self):
        prev_data = self.prev.data if self.prev else None
        next_data = self.next.data if self.next else None
        return f"Data: {self.data}, Prev: {prev_data}, Next: {next_data}"

In [36]:
class CircularDoublyLinkedList:
    def __init__(self):
        self.head = None
        self.size = 0

    def is_empty(self):
        return self.head is None

    def append(self, data):
        new_node = Node(data)
        if self.is_empty():
            self.head = new_node
            self.head.next = self.head
            self.head.prev = self.head
        else:
            tail = self.head.prev
            tail.next = new_node
            new_node.prev = tail
            new_node.next = self.head
            self.head.prev = new_node
        self.size += 1

    def insert(self, pos, data):
        if pos < 0 or pos > self.size:
            raise IndexError("Index out of range")
        new_node = Node(data)
        if pos == 0:  # Insertar en el inicio
            if self.is_empty():
                self.head = new_node
                self.head.next = self.head
                self.head.prev = self.head
            else:
                tail = self.head.prev
                new_node.next = self.head
                new_node.prev = tail
                tail.next = new_node
                self.head.prev = new_node
                self.head = new_node
        else:
            current = self.head
            for _ in range(pos - 1):
                current = current.next
            new_node.next = current.next
            new_node.prev = current
            current.next.prev = new_node
            current.next = new_node
        self.size += 1

    def pop(self):
        if self.is_empty():
            raise IndexError("Pop from empty list")
        tail = self.head.prev
        if self.head == tail:  # Solo hay un nodo
            self.head = None
        else:
            self.head.prev = tail.prev
            tail.prev.next = self.head
        self.size -= 1
        return tail.data

    def remove(self, pos):
        if self.is_empty():
            raise IndexError("Remove from empty list")
        if pos < 0 or pos >= self.size:
            raise IndexError("Index out of range")
        if pos == 0:  # Eliminar el primero
            if self.size == 1:
                self.head = None
            else:
                tail = self.head.prev
                self.head = self.head.next
                self.head.prev = tail
                tail.next = self.head
        else:
            current = self.head
            for _ in range(pos):
                current = current.next
            current.prev.next = current.next
            current.next.prev = current.prev
        self.size -= 1

    def next(self):
        if self.is_empty():
            raise IndexError("The list is empty")
        current_data = self.head.data
        self.head = self.head.next
        return current_data

    def previous(self):
        if self.is_empty():
            raise IndexError("The list is empty")
        current_data = self.head.data
        self.head = self.head.prev
        return current_data

    def first(self):
        if self.is_empty():
            raise IndexError("The list is empty")
        return self.head.data

    def last(self):
        if self.is_empty():
            raise IndexError("The list is empty")
        return self.head.prev.data

    def pos(self):
        if self.is_empty():
            raise IndexError("The list is empty")
        
        current = self.head
        position = 0
        while current != self.head.prev:  # Recorrer hasta llegar al nodo actual
            if current == self.head:  # Si el nodo actual es el nodo en la posición buscada
                return position
            current = current.next
            position += 1

        return position  # Devuelve la posición actual

    def display(self):
        if self.is_empty():
            print("List is empty")
            return
        print(f"Number of nodes: {self.size}")  # Imprime la cantidad de nodos al principio
        current = self.head
        for _ in range(self.size):
            print(current.info(), end=" <-> ")
            current = current.next
        print("Circular loop")
        
    def current(self):
        if self.is_empty():
            raise IndexError("The list is empty")
        position = self.pos()  # Obtener la posición actual
        return position, self.head.data  # Devolver la posición y el contenido del nodo actual

# Configure logging

In [37]:
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s', 
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

## Defining constant values

In [38]:
# Constant values
DATA_DIR = "../data/raw" # Data folder.
RESULTS_DIR = "../data/processed" # Data folder.
MSG_EXT = (".eml", ".msg", ".txt") # Allowed extensions for search messages.

# API KEY para lbustio
GROQ_API_KEY_LBUSTIO = {"lbustio": "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"}

# API KEY para xcolar
GROQ_API_KEY_XCOLAR = {"xcolar": "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"}

# API KEY para Vitali
GROQ_API_KEY_VITALI = {"vitali": "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"}

# API KEY para Mike
GROQ_API_KEY_MIKE = {"mike": "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"}


MODEL1="llama-3.1-70b-versatile"
MODEL2="llama3-70b-8192"

MESSAGE_SCRAPE = "You are a language model specialized in extracting readable text from HTML code. " + \
"Your task is to process the provided HTML code and return only the visible text, removing all HTML tags " + \
"and any other non-textual content. Ensure that the structure and order of the text are maintained as they " + \
"appear in the original HTML. Return ONLY the text without any further explanation"

LANGUAGE_DETECTOR = "From the given text, detect all the languages used and identify the majority language. " + \
"For example, for the text 'You are tremendamente linda, lindisima!!!, ma chérie', the languages are: " + \
"English, Spanish, French. Your response should be only the majority language followed by a list of all " + \
"detected languages in the format: [major language, ['language 1, language 2, ...]]. Do not provide any additional explanation."

SYSTEM_PROMPT = ""
user_prompt = ""

# Constants for API limits
REQUESTS_PER_MINUTE = 6
DAILY_LIMIT = 14400
RATE_LIMIT_DELAY = 60 / REQUESTS_PER_MINUTE  # Delay in seconds to maintain the rate limit

In [39]:
# Uso de la lista circular doblemente enlazada
groq_api_keys = CircularDoublyLinkedList()

# Insertar elementos
groq_api_keys.append(GROQ_API_KEY_LBUSTIO)
groq_api_keys.append(GROQ_API_KEY_MIKE)
groq_api_keys.append(GROQ_API_KEY_VITALI)
groq_api_keys.append(GROQ_API_KEY_XCOLAR)

groq_api_keys.display()  # Muestra la lista circular

Number of nodes: 4
Data: {'lbustio': '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'}, Prev: {'xcolar': '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'}, Next: {'mike': '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'} <-> Data: {'mike': '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'}, Prev: {'lbustio': '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'}, Next: {'vitali': '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'} <-> Data: {'vitali': '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'}, Prev: {'mike': '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'}, Next: {'xcolar': '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'} <-> Data: {'xcolar': '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'}, Prev: {'vitali': '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'}, Next: {'lbustio': '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'} <-> Circular loop


In [40]:
# Uso de la lista circular doblemente enlazada
models = CircularDoublyLinkedList()

# Insertar elementos
models.append(MODEL1)
models.append(MODEL2)

models.display()  # Muestra la lista circular

Number of nodes: 2
Data: llama-3.1-70b-versatile, Prev: llama3-70b-8192, Next: llama3-70b-8192 <-> Data: llama3-70b-8192, Prev: llama-3.1-70b-versatile, Next: llama-3.1-70b-versatile <-> Circular loop


## Defining utils functions

In [41]:
# Obtener el primer nodo y su contenido
first_dict = groq_api_keys.first()  # Obtener el primer nodo

# Desestructurar el diccionario
api_key, api_value = next(iter(first_dict.items()))  # Obtener la primera clave y su valor

logger.info(f"API Key: {api_key}, API Value: {api_value}")

client = groq.Client(
    #api_key=os.environ.get("GROQ_API_KEY"), # Si se quiere usar la API Key exportada al SO.
    api_key = api_value,
)

2024-09-19 20:18:34 - INFO - API Key: lbustio, API Value: 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


### Requesting GROQ

In [42]:
# Función de chat completion sincrónica con reintentos
@tenacity.retry(wait=tenacity.wait_exponential(min=1, max=10), stop=tenacity.stop_after_attempt(3))
def chat_completion_orig(prompts, model):
    try:
        # Simulación de la llamada a una API (reemplaza esto con la llamada real)
        response = client.chat.completions.create(
            model=model,
            messages=prompts,
            temperature=1,
            max_tokens=3300,
            top_p=1,
            stream=True
        )
        
        # Introduce un retraso para respetar los límites de velocidad
        time.sleep(RATE_LIMIT_DELAY)
        return response

    except groq.RateLimitError:
        logger.warning("Rate limit exceeded. Please try again later.")
    except groq.AuthenticationError:
        logger.error("Authentication failed. Please check your API key.")
    except groq.GroqError as e:
        logger.error(f"Error: {e}")
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")

In [43]:
# Función de chat completion sincrónica con reintentos
#@tenacity.retry(wait=tenacity.wait_exponential(min=1, max=10),
#                stop=tenacity.stop_after_attempt(3),
#                before_sleep=tenacity.before_sleep_log(logger, logging.INFO))
# Función de chat completion sincrónica con reintentos
@tenacity.retry(wait=tenacity.wait_exponential(min=1, max=10), stop=tenacity.stop_after_attempt(3))
def chat_completion(prompts, model):
    
    try:
        response = client.chat.completions.create(
            model=model,
            messages=prompts,
            temperature=1,
            max_tokens=3300,
            top_p=1,
            stream=True
        )
        
        # Introduce un retraso para respetar los límites de velocidad
        time.sleep(RATE_LIMIT_DELAY)
        return response

    except groq.RateLimitError as e:
        # Extraer las cabeceras para más información
        retry_after = e.response.headers.get("retry-after", "not specified")
        remaining_requests = e.response.headers.get("x-ratelimit-remaining-requests", "not specified")
        reset_time = e.response.headers.get("x-ratelimit-reset-requests", "not specified")

        logger.warning(f"Rate limit exceeded. Please try again later.\n"
                       f"The system said: {e}\n"
                       f"Retry after: {retry_after} seconds.\n"
                       f"Remaining requests: {remaining_requests}.\n"
                       f"Requests reset in: {reset_time}.")
        
        if (groq_api_keys.current() == groq_api_keys.last()):
            groq_api_keys.next()
            models.next()
            
            # Desestructurar el diccionario
            current_node = groq_api_keys.current()
            api_key, api_value = next(iter(current_node.items()))  # Obtener la primera clave y su valor
            
            client.api_key = api_value
        
        logger.info(f"Using model '{models.current()[1]}' with KEY '{api_value}'.")
        
        time.sleep(int(retry_after))  # Espera el tiempo indicado antes de reintentar

    except groq.AuthenticationError as e:
        logger.error(f"Authentication failed. Please check your API key.\nThe system said: {e}")

    except groq.GroqError as e:
        logger.error(f"Groq error: {e}")

    except Exception as e:
        logger.exception(f"An unexpected error occurred: {e}")

In [44]:
def get_extensions(directory_path):
    """
    Lists unique file extensions found in the specified directory.

    Args:
        directory_path (str): The path to the directory to search.

    Returns:
        list: A list containing the unique file extensions.
    """

    if not os.path.isdir(directory_path):  # Check if the provided path is a directory
        raise ValueError(f"The path {directory_path} is not a valid directory.")

    extensions = set()  # Create an empty set to store unique extensions efficiently

    try:
        # Loop through all files in the directory
        for filename in os.listdir(directory_path):
            file_path = os.path.join(directory_path, filename)  # Construct the full file path
            if os.path.isfile(file_path):  # Ensure that we are processing files, not directories
                _, extension = os.path.splitext(filename)  # Split the filename into base name and extension
                if extension:  # Check if the file actually has an extension
                    extensions.add(extension.lower())  # Add the extension to the set (convert to lowercase for consistency)
    except OSError as e:  # Handle any OS-related errors (e.g., permission issues)
        raise RuntimeError(f"Error accessing directory {directory_path}: {e}")

    return list(extensions)  # Convert the set of unique extensions into a list for returning

In [45]:
def get_messages(directory_path, allowed_extensions):
    """
    Creates a dictionary of file paths and their SHA-256 hashes in the specified directory,
    filtering by allowed extensions.

    Args:
        directory_path (str): The path to the directory to search.
        allowed_extensions (list): A list of allowed file extensions (e.g., [".eml", ".txt"]).

    Returns:
        dict: A dictionary where keys are filenames and values are tuples (absolute_path, sha256_hash).
    """
    
    msg_dict = {}  # Initialize an empty dictionary to store the file information
    allowed_extensions = set(ext.lower() for ext in allowed_extensions)  # Convert list to set for faster lookups
    directory = Path(directory_path)  # Create a Path object for the directory

    # Check if the directory exists and is a directory
    if not directory.is_dir():
        raise ValueError(f"The directory {directory_path} does not exist or is not a directory.")
    
    # Iterate through each file in the directory
    for file_path in directory.iterdir():
        # Proceed if the path is a file
        if file_path.is_file():
            extension = file_path.suffix.lower()  # Get the file extension and convert it to lowercase
            # Check if the file's extension is in the list of allowed extensions
            if extension in allowed_extensions:
                sha256_hash = hashlib.sha256()  # Create a new SHA-256 hash object
                try:
                    # Open the file in binary mode for hashing
                    with file_path.open("rb") as f:
                        # Read the file in chunks of 8192 bytes and update the hash object
                        while chunk := f.read(8192):
                            sha256_hash.update(chunk)
                except IOError as e:
                    # Print a message and skip the file if it cannot be read
                    print(f"Could not read file {file_path}: {e}")
                    continue
                
                # Store the filename as key and a tuple (absolute_path, sha256_hash) as value in the dictionary
                msg_dict[file_path.name] = (str(file_path.resolve()), sha256_hash.hexdigest())
    
    return msg_dict  # Return the dictionary containing file paths and their SHA-256 hashes

In [46]:
def dict_as_table(msg_dict, title):
    """
    Prints the message dictionary as a formatted table.

    Args:
        msg_dict (dict): The dictionary of messages, where keys are filenames and values are tuples containing the absolute path and SHA-256 hash.
        title (str): The title of the table.
    """
    
    # Count the number of elements in the dictionary
    num_elements = len(msg_dict)
    print(f"\nNumber of elements in the dictionary: {num_elements}\n")
    
    # Define the table headers
    headers = ["Filename", "Absolute Path", "SHA-256 Hash"]
    
    # Create a list of tuples containing the filename, path, and SHA-256 hash for each entry in the dictionary
    table = [(filename, path, sha256_hash) for filename, (path, sha256_hash) in msg_dict.items()]
    
    # Print the table title
    print(f"\n{title}\n")
    
    # Print the formatted table using the tabulate library
    # The 'tablefmt="grid"' format provides a grid layout for the table
    print(tabulate(table, headers=headers, tablefmt="grid"))

In [47]:
def get_duplicated(msg_dict):
    """
    Finds unique and duplicate files based on their SHA-256 hashes.

    Args:
        msg_dict (dict): The dictionary of messages, where keys are filenames and values are tuples containing the absolute path and SHA-256 hash.

    Returns:
        tuple: A tuple containing two dictionaries:
            - unique_files: Files that have a unique SHA-256 hash.
            - duplicate_files: Files that have duplicate SHA-256 hashes.
    """
    
    # Create a defaultdict where each key (file hash) maps to a list of tuples (filename, path)
    hash_dict = defaultdict(list)

    # Group files by their SHA-256 hash
    for filename, (path, file_hash) in msg_dict.items():
        hash_dict[file_hash].append((filename, path))  # Append file information to the list associated with this hash

    unique_files = {}  # Initialize an empty dictionary to store files with unique hashes
    duplicate_files = {}  # Initialize an empty dictionary to store files with duplicate hashes

    # Separate unique and duplicate files based on hash counts
    for file_hash, files in hash_dict.items():
        if len(files) == 1:  # Check if there is only one file with this hash (unique)
            filename, path = files[0]  # Extract the filename and path of the unique file
            unique_files[filename] = (path, file_hash)  # Add to the unique_files dictionary
        else:  # If multiple files have the same hash, they are duplicates
            for filename, path in files:  # Iterate through all files with this hash
                duplicate_files[filename] = (path, file_hash)  # Add to the duplicate_files dictionary

    return unique_files, duplicate_files  # Return a tuple containing the unique_files and duplicate_files

In [48]:
def decode_mime(header_value):
    """
    Decodes MIME-encoded headers.
    
    Args:
        header_value (str): The MIME-encoded header value.
        
    Returns:
        str: The decoded header value.
    """
    decoded_parts = decode_header(header_value)
    decoded_string = ""
    for part, encoding in decoded_parts:
        if isinstance(part, bytes):
            try:
                part = part.decode(encoding or 'utf-8')
            except (TypeError, UnicodeDecodeError):
                part = part.decode('utf-8', errors='replace')  # Fallback
        decoded_string += part
    return decoded_string

In [49]:
def get_email_info(msg):
    """
    Extracts relevant information from an email message.
    
    Args:
        msg (Message): The email message object.
        
    Returns:
        dict: A dictionary containing extracted email information.
    """
    info = {}
    
    try:
        # Decode MIME headers
        from_header = decode_mime(msg.get('From', ''))
        info['from'] = re.sub(r'<.*>', '', from_header).strip()
        info['from_email'] = re.search(r'<(.+?)>', from_header).group(1) if '<' in from_header else ''
        
        to_header = decode_mime(msg.get('To', ''))
        info['to'] = re.sub(r'<.*>', '', to_header).strip()
        info['to_email'] = re.search(r'<(.+?)>', to_header).group(1) if '<' in to_header else ''
        
        info['date'] = decode_mime(msg.get('Date', ''))
        info['subject'] = re.sub(r'\s+', ' ', decode_mime(msg.get('Subject', '')))
        
        # Initialize additional fields
        info['body'] = ''
        info['format'] = 'Unknown'
        info['has_attachments'] = False
        info['attachment_count'] = 0
        info['line_breaks'] = 0
        
        body = ""

        # Handle email body and attachments
        if msg.is_multipart():
            # If the message is multipart, iterate over parts
            for part in msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get('Content-Disposition', ''))
                charset = part.get_content_charset()
                
                if content_type == 'text/plain':
                    try:
                        body += part.get_payload(decode=True).decode(charset or 'utf-8', errors='replace')
                        info['body'] = re.sub(r'\s+', ' ', body.strip()) 
                        info['format'] = 'Text'
                    except (TypeError, UnicodeDecodeError):
                        body += part.get_payload(decode=True).decode('utf-8', errors='replace')
                        info['body'] = re.sub(r'\s+', ' ', body.strip())
                elif content_type == 'text/html':
                    try:
                        body += part.get_payload(decode=True).decode(charset or 'utf-8', errors='replace')
                        info['body'] = re.sub(r'\s+', ' ', body.strip())
                        info['format'] = 'HTML'
                    except (TypeError, UnicodeDecodeError):
                        body += part.get_payload(decode=True).decode('utf-8', errors='replace')
                        info['body'] = re.sub(r'\s+', ' ', body.strip())
                elif 'attachment' in content_disposition:
                    info['has_attachments'] = True
                    info['attachment_count'] += 1
        else:
            # Handle non-multipart messages
            content_type = msg.get_content_type()
            charset = msg.get_content_charset()
            try:
                if content_type == 'text/plain':
                    body = msg.get_payload(decode=True).decode(charset or 'utf-8', errors='replace')
                    info['body'] = re.sub(r'\s+', ' ', body.strip())
                    info['format'] = 'Text'
                elif content_type == 'text/html':
                    body = msg.get_payload(decode=True).decode(charset or 'utf-8', errors='replace')
                    info['body'] = re.sub(r'\s+', ' ', body.strip())
                    info['format'] = 'HTML'
            except (TypeError, UnicodeDecodeError):
                body = msg.get_payload(decode=True).decode('utf-8', errors='replace')
                info['body'] = re.sub(r'\s+', ' ', body.strip())
        
        # Count line breaks
        info['line_breaks'] = info['body'].count('\n') if info.get('body') else 0

    except AttributeError as e:
        print(f"Error processing email: {e}")
    
    return info

In [50]:
def read_emails(unique_msg, verbose=False):
    """
    Reads email files and converts them into a DataFrame.

    Args:
        unique_msg (dict): Dictionary where keys are filenames and values are tuples containing the absolute path and SHA-256 hash.
        verbose (bool): If True, prints status messages. If False, suppresses output.

    Returns:
        pd.DataFrame: DataFrame containing the extracted email information.
    """
    email_infos = []  # List to store email information dictionaries
    errors = []  # List to store files that caused errors

    for filename, (path, _) in unique_msg.items():
        file_extension = Path(filename).suffix.lower()
        if verbose:
            print(f"Processing file: {filename} (Extension: {file_extension})")

        try:
            with open(path, 'rb') as file:
                if verbose:
                    print(f"Opened file: {path}")

                if file_extension == '.eml':
                    if verbose:
                        print(f"Reading .eml file: {path}")
                    msg = BytesParser(policy=policy.default).parse(file)
                    email_infos.append(get_email_info(msg))
                    if verbose:
                        print(f"Read .eml file successfully: {path}")

                elif file_extension == '.msg':
                    if verbose:
                        print(f"Reading .msg file: {path}")
                    msg = extract_msg.Message(path)
                    msg_dict = get_email_info(msg)
                    email_infos.append(msg_dict)
                    if verbose:
                        print(f"Read .msg file successfully: {path}")

                elif file_extension == '.txt':
                    if verbose:
                        print(f"Reading .txt file: {path}")
                    msg = email.message_from_file(file)
                    email_infos.append(get_email_info(msg))
                    if verbose:
                        print(f"Read .txt file successfully: {path}")

        except Exception as e:
            if verbose:
                print(f"Error processing file {path}: {e}")
            errors.append((filename, str(e)))  # Append the filename and error message to the errors list
            continue  # Continue with the next file in case of an error

    # Convert the list of email information dictionaries to a DataFrame
    df = pd.DataFrame(email_infos)

    return df, errors

In [51]:
def filter_non_printable(text):
    """
    Filters out non-printable characters from the given text.
    
    Parameters:
    text (str): The input text that needs to be filtered.
    
    Returns:
    str: The text with non-printable characters removed. If the input is NaN, it returns the input as is.
    """
    # Check if the text is NaN (Not a Number), which indicates a missing value in pandas.
    if pd.isna(text):
        return text
    
    # Create a set of printable characters using the `string.printable` from the string module.
    printable = set(string.printable)
    
    # Use the `filter` function to keep only the characters that are in the `printable` set.
    # The `lambda` function is used to check if each character is in the set of printable characters.
    return ''.join(filter(lambda x: x in printable, text))


In [52]:
# Function to parse the content of the 'body' field
def parse_body(row):
    """
    Parses the content of the 'body' field from the given DataFrame row, depending on the format type.
    
    Parameters:
    row (pandas.Series): A row from the DataFrame containing 'body' and 'format' fields.
    
    Returns:
    str: The parsed and filtered text from the 'body' field.
    """
    body = row['body']          # Extract the 'body' content from the row
    format_type = row['format'] # Extract the 'format' type from the row
    
    # Check if the 'body' content is NaN (Not a Number)
    if pd.isna(body):
        return body  # If 'body' is NaN, return it as is
    
    # Parse the 'body' content based on the format type
    if format_type == 'HTML':
        # If the format is HTML, parse the HTML content using BeautifulSoup
        soup = BeautifulSoup(body, 'html.parser')
        text = soup.get_text(separator='\n')  # Extract text with line breaks
    elif format_type == 'Text':
        # If the format is plain text, use the 'body' content as is
        text = body
    else:
        # If the format type is unknown, try to parse it as HTML anyway
        try:
            soup = BeautifulSoup(body, 'html.parser')
            text = soup.get_text(separator='\n')  # Extract text with line breaks
        except Exception as e:
            # If parsing fails, use the 'body' content as is
            text = body
    
    # Filter out non-printable characters from the text
    text = text.strip()
    text = re.sub(r'\s+', ' ', text)
    text = filter_non_printable(text)
    
    return text


# Body parser

In [53]:
# Function to parse the content of the 'body' field
def parse_body_llm(row):
    """
    Parses the content of the 'body' field from the given DataFrame row, depending on the format type.
    
    Parameters:
    row (pandas.Series): A row from the DataFrame containing 'body' and 'format' fields.
    
    Returns:
    str: The parsed and filtered text from the 'body' field.
    """
    body = row['body']          # Extract the 'body' content from the row
    format_type = row['format'] # Extract the 'format' type from the row
    text = ""
    
    # Check if the 'body' content is NaN (Not a Number)
    if pd.isna(body):
        return body  # If 'body' is NaN, return it as is
    
    # Parse the 'body' content based on the format type
    if format_type == 'HTML':
        SYSTEM_PROMPT = MESSAGE_SCRAPE
        # If the format is HTML, parse the HTML content using LLM
        logger.info("Starting message scrapping.")
        try:
            messages = [
                {
                    'role': 'system',
                    'content': SYSTEM_PROMPT
                },
                {
                    'role': 'user',
                    'content': body
                }
            ]

            logger.debug("Sending messages to chat_completion: %s", messages)
            
            # Call the rate-limited chat completion function
            mdl = models.current()
            print(mdl[1])
            response = chat_completion(messages, mdl)
            
            # Process the response
            text = ''.join([chunk.choices[0].delta.content or "" for chunk in response]).strip()
            
            logger.info("Done!")
        except Exception as e:
            logger.error("An unexpected error occurred: %s", e)
            text = body
    elif format_type == 'Text':
        # If the format is plain text, use the 'body' content as is
        text = body
    else:
        # If the format type is unknown, try to parse it as HTML anyway
        SYSTEM_PROMPT = MESSAGE_SCRAPE
        # If the format is HTML, parse the HTML content using LLM
        logger.info("Starting message scrapping.")
        try:
            messages = [
                {
                    'role': 'system',
                    'content': SYSTEM_PROMPT
                },
                {
                    'role': 'user',
                    'content': body
                }
            ]

            logger.debug("Sending messages to chat_completion: %s", messages)
            
            # Obtener el nodo actual y su contenido
            model_node = models.current()  # Obtener el primer nodo

            # Desestructurar el diccionario
            model = next(iter(model_node.items()))
            print(model[1])
            
            response = chat_completion(messages, model)
            
            # Process the response
            text = ''.join([chunk.choices[0].delta.content or "" for chunk in response]).strip()
            
            logger.info("Done!")
        except Exception as e:
            logger.error("An unexpected error occurred: %s", e)
            text = body
    
    # Filter out non-printable characters from the text
    text = text.strip()
    text = re.sub(r'\s+', ' ', text)
    text = filter_non_printable(text)
    
    return text

In [54]:
def detect_language(text):    
    if pd.isna(text):
        logger.info("Input text is NaN or empty. Returning 'unknown'.")
        return 'unknown'
    
    logger.info("Starting language detection.")
    try:
        messages = [
            {
                'role': 'system',
                'content': SYSTEM_PROMPT
            },
            {
                'role': 'user',
                'content': text
            }
        ]

        logger.debug("Sending messages to chat_completion: %s", messages)
        
        # Call the rate-limited chat completion function
        response = chat_completion(messages, model)
        
        # Process the response
        detected_language = ''.join([chunk.choices[0].delta.content or "" for chunk in response]).strip()
        
        logger.info("Language detected: %s", detected_language)
        return detected_language
        
    except LangDetectException as e:
        logger.error("Language detection failed: %s", e)
        return "not-detected"
    except Exception as e:
        logger.error("An unexpected error occurred: %s", e)
        return "not-detected"


In [55]:
# Crear una función para detectar y reemplazar direcciones de correo electrónico
def replace_emails(text):
    if isinstance(text, str):  # Verificar si el texto es una cadena
        # Definir la expresión regular para detectar direcciones de correo electrónico
        email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
        # Reemplazar las direcciones de correo electrónico por 'email@address.com'
        return re.sub(email_pattern, 'email@address.com', text)
    else:
        return text  # Retornar el texto sin cambios si no es una cadena

In [56]:
def clean_string(s):
    # Reemplaza ",", "[" y "]" por espacio
    s = re.sub(r'[,\[\]]', ' ', s)
    
    # Reemplaza múltiples espacios por un solo espacio
    s = re.sub(r'\s+', ' ', s)
    
    # Reemplaza los caracteres ' por una cadena vacía
    s = s.replace("'", "")
    
    # Elimina espacios al principio y al final de la cadena
    s = s.strip()
    
    return s

In [57]:
def create_lang_list(lang):
    # Limpia la cadena
    temp = clean_string(lang)
    
    # Crea la lista de elementos separados por espacio
    lang_list = temp.split(' ')
    
    return lang_list

In [58]:
def extract_major_lang(lang):
    if isinstance(lang, str):
        # Si lang es una cadena, convierte en lista
        lang_list = lang_list = create_lang_list(lang)
    elif isinstance(lang, list):
        # Si lang ya es una lista, asegúrate de que no haya listas anidadas
        flat_lang_list = []
        for item in lang:
            if isinstance(item, str):
                flat_lang_list.append(item)
            elif isinstance(item, list):
                flat_lang_list.extend(item)
        lang_list = flat_lang_list
    else:
        return None  # O maneja el caso de valores inesperados

    # Determina el idioma mayoritario (el primer idioma si no hay frecuencia)
    if lang_list:
        return lang_list[0]
    return None

In [59]:
def filter_non_printable(text):
    """
    Filters out non-printable characters from the given text.
    
    Parameters:
    text (str): The input text that needs to be filtered.
    
    Returns:
    str: The text with non-printable characters removed. If the input is NaN, it returns the input as is.
    """
    # Check if the text is NaN (Not a Number), which indicates a missing value in pandas.
    if pd.isna(text):
        return text
    
    # Create a set of printable characters using the `string.printable` from the string module.
    printable = set(string.printable)
    
    # Use the `filter` function to keep only the characters that are in the `printable` set.
    # The `lambda` function is used to check if each character is in the set of printable characters.
    return ''.join(filter(lambda x: x in printable, text))

## Detecting email files to process

Traversing the data folder to identify and list the file extensions. This step is crucial to distinguish email files from other file types, allowing us to filter out and discard non-email files.

In [60]:
# Retrieve a list of unique file extensions from the specified directory
unique_extensions = get_extensions(DATA_DIR)

# Print a message indicating that unique file extensions are being displayed
print("Unique file extensions found:")

# Check if any extensions were found
if unique_extensions:
    # Print each extension with a bullet point for better readability
    for ext in unique_extensions:
        print(f" - {ext}")  # Print each extension as a bulleted item
else:
    # Inform the user if no extensions were found
    print("No file extensions found.")


Unique file extensions found:
 - .msg
 - .eml
 - .ini


After identifying the email files, the next step is to gather all email files with the extensions ".msg" and ".eml". Additionally, if there are any emails stored in text format, the ".txt" extension will also be included.

In [61]:
# Obtain all the emails files in a dictionary
msg_dict = get_messages(DATA_DIR, MSG_EXT)

# Print the dictionary of loaded files
dict_as_table(msg_dict, "Data Messagges")


Number of elements in the dictionary: 853


Data Messagges

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
| Filename                                                                                                                                                                                                                                  | Absolute Path                                                                                                                        

Once the email files were loaded, a SHA-256 hash was calculated for each one. This step was taken to identify and discard duplicate email files, ensuring that only unique emails are processed.

In [62]:
# Detect unique and duplicated email files.
unique_msg, duplicated_msg = get_duplicated(msg_dict)

# Count the elements in each dictionary
num_unique_msg = len(unique_msg)
num_duplicate_msg = len(duplicated_msg)

print(f"\nNumber of unique files: {num_unique_msg}")
print(f"Number of duplicate files: {num_duplicate_msg}")

# Print unique and duplicated email files.
dict_as_table(unique_msg, "Unique messages")
dict_as_table(duplicated_msg, "Duplicate messages")


Number of unique files: 514
Number of duplicate files: 339

Number of elements in the dictionary: 514


Unique messages

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
| Filename                                                                                                                                                                                                                                  | Absolute Path                                                           

## Loading unique email files

The unique email files must be loaded for further processing. This step is crucial as the emails need to be parsed to extract essential details such as the subject, body, number of attachments, sender's name, sender's address, and the date and time.

In [63]:
# Example usage
email_df, errors = read_emails(unique_msg, verbose=False)

# Print summary of results
print("\nSummary of file processing:")
print(f"Number of successfully loaded files: {len(email_df)}")
if errors:
    print(f"Number of errors encountered: {len(errors)}")
    for filename, error in errors:
        print(f" - {filename}: {error}")
else:
    print("No errors encountered.")

Error processing email: 'Message' object has no attribute 'get'
Error processing email: 'Message' object has no attribute 'get'
Error processing email: 'Message' object has no attribute 'get'
Error processing email: 'Message' object has no attribute 'get'
Error processing email: 'Message' object has no attribute 'get'

Summary of file processing:
Number of successfully loaded files: 509
Number of errors encountered: 5
 - 202308280707#factura ind.mass fa.eml: unknown encoding: uft-8
 - 202308280713#factura ind.mass fa.eml: unknown encoding: uft-8
 - 202308291118#Ha recibido una nueva factura.eml: unknown encoding: uft-8
 - =%3fBRF%3fQ%3fA=F0=9D=85=BAc=F0=9D=85=BAc=F0=9D=85=B4o=F0=9D=85=B8u=F0=9D=85=B4n=F0=9D=85=B8t=F0=9D=85=B8 =F0=9D=...%3f=a̷=%3fBRF%3fQ%3fl=F0=9D=85=B8%3f= %3c0fdw5qbkxsoviq4dkam5-mur3cgiqag@emdeals.michaels.com%3e - 2021-08-12 1054.eml: unknown encoding: brf
 - A𝅺c𝅺c𝅴o𝅸u𝅴n𝅸t𝅸 𝅷R𝅴e𝅸s𝅷t𝅸r𝅸i𝅺c⁠t𝅺e𝅸d⁠.eml: unknown encoding: brf


# Scrapping the email's body

In [64]:
try:
    # Scappe the message's body
    total_rows = email_df.shape[0]  # Obtener el número total de filas
    logger.info("Starting scrapping for %d rows.", total_rows)

    for index, row in email_df.iterrows():
        
        body_text = parse_body_llm(row)
        
        # Registrar el progreso de la ejecución con información sobre la fila actual y la hora
        logger.info("Processed row %d of %d at %s", 
                    index + 1, total_rows, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

    email_df.loc[index, 'parsed_body'] = body_text  # Agregar la lista de lenguajes detectados al DataFrame
    
    logger.info("Scrape body completed successfully for all rows.")
except Exception as e:
    logger.error("An error occurred during scrapping message: %s", e)

# Mostrar el DataFrame actualizado
logger.info("Displaying the updated DataFrame with body scrapped.")
email_df

2024-09-19 20:18:49 - INFO - Starting scrapping for 509 rows.
2024-09-19 20:19:07 - INFO - Starting message scrapping.


llama-3.1-70b-versatile


2024-09-19 20:19:23 - INFO - HTTP Request: POST https://api.groq.com/openai/v1/chat/completions "HTTP/1.1 400 Bad Request"
2024-09-19 20:19:27 - ERROR - Groq error: Error code: 400 - {'error': {'message': "'model' : doesn't match any schema from 'anyOf'", 'type': 'invalid_request_error'}}
2024-09-19 20:19:31 - ERROR - An unexpected error occurred: 'NoneType' object is not iterable


KeyboardInterrupt: 

In [None]:
email_df.info()

In [None]:
# Aplicar la función parse_body a cada fila del DataFrame
#email_df['parsed_body'] = email_df.apply(parse_body, axis=1)

# Anonimizar los emails en el campo parsed_body
#email_df['parsed_body'] = email_df['parsed_body'].apply(replace_emails)

# Eliminar los emails cuyo parsed_body está repetido
email_df.drop_duplicates(subset=['parsed_body'], inplace=True)

# Mostrar el resultado
email_df.shape

Detecting language

In [None]:
try:
    # Detectar el idioma fila por fila sin usar apply
    langs = []
    total_rows = len(email_df)  # Obtener el número total de filas
    logger.info("Starting language detection for %d rows.", total_rows)

    for index, row in email_df.iterrows():
        text = row['parsed_body']
        detected_lang = detect_language(text)
        langs.append(detected_lang)

        # Registrar el progreso de la ejecución con información sobre la fila actual y la hora
        logger.info("Processed row %d of %d at %s, detected language: %s", 
                    index + 1, total_rows, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), detected_lang)

        #El delay está en chat_completation

    email_df['lang'] = langs  # Agregar la lista de lenguajes detectados al DataFrame
    
    logger.info("Language detection completed successfully for all rows.")
except Exception as e:
    logger.error("An error occurred during language detection: %s", e)

# Mostrar el DataFrame actualizado
logger.info("Displaying the updated DataFrame with detected languages.")
print(email_df)

In [None]:
email_df.info()

Si los datos ya se procesaron, entonces se cargan.

In [None]:
# Verifica si mail_df es None y carga el DataFrame desde un archivo CSV si es necesario
if email_df is None:
    file_path = os.path.join(RESULTS_DIR, "email_df.csv")
    email_df = pd.read_csv(file_path)
    print(email_df.shape)  # Imprime la forma del DataFrame cargado para verificar que se cargó correctamente
else:
    print(email_df.shape)

In [None]:
# Aplicar la función para crear la columna 'major_lang'
email_df['major_lang'] = email_df['lang'].apply(extract_major_lang)

email_df[['parsed_body', 'lang', 'major_lang']]

In [None]:
# Definir la ruta del archivo Excel donde se guardará el DataFrame
file_path = os.path.join(RESULTS_DIR, 'email_df.csv')

# Guardar el DataFrame en un archivo CSV, sin incluir el índice
email_df.to_csv(file_path, index=False)

print(f"DataFrame saved to {file_path}")