In [1]:
# Install the required packages for reading .msg files
!pip install extract-msg
!pip install python-msg
!pip install olefile

Collecting extract-msg
  Downloading extract_msg-0.55.0-py3-none-any.whl.metadata (15 kB)
Collecting tzlocal<6,>=4.2 (from extract-msg)
  Downloading tzlocal-5.3.1-py3-none-any.whl.metadata (7.6 kB)
Collecting compressed-rtf<2,>=1.0.6 (from extract-msg)
  Downloading compressed_rtf-1.0.7-py3-none-any.whl.metadata (2.4 kB)
Collecting ebcdic<2,>=1.1.1 (from extract-msg)
  Downloading ebcdic-1.1.1-py2.py3-none-any.whl.metadata (8.9 kB)
Collecting RTFDE<0.2,>=0.1.1 (from extract-msg)
  Downloading rtfde-0.1.2.1-py3-none-any.whl.metadata (4.1 kB)
Collecting red-black-tree-mod<=1.23,>=1.20 (from extract-msg)
  Downloading red-black-tree-mod-1.22.tar.gz (34 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting lark~=1.1.8 (from RTFDE<0.2,>=0.1.1->extract-msg)
  Downloading lark-1.1.9-py3-none-any.whl.metadata (1.9 kB)
Collecting oletools>=0.56 (from RTFDE<0.2,>=0.1.1->extract-msg)
  Downloading oletools-0.60.2-py2.py3-none-any.whl

  DEPRECATION: Building 'red-black-tree-mod' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'red-black-tree-mod'. Discussion can be found at https://github.com/pypa/pip/issues/6334
  DEPRECATION: Building 'win-unicode-console' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'win-unicode-console'. Discussion can be found at https://github.com/pypa/pip/issues/6334
ERROR: Could not find a version that s



In [10]:
# Import required libraries
import os
import sys
import extract_msg
from pathlib import Path
import json
from datetime import datetime
import base64

In [11]:
class MSGFileReader:
    """
    A comprehensive class to read Outlook .msg files and extract email content and attachments.
    """
    
    def __init__(self, msg_file_path):
        """
        Initialize the MSGFileReader with a .msg file path.
        
        Args:
            msg_file_path (str): Path to the .msg file
        """
        self.msg_file_path = Path(msg_file_path)
        self.msg = None
        self.email_data = {}
        
    def read_msg_file(self):
        """
        Read the .msg file and extract all email data.
        
        Returns:
            dict: Dictionary containing all email information
        """
        try:
            # Open the .msg file
            self.msg = extract_msg.Message(self.msg_file_path)
            
            # Extract basic email information
            self.email_data = {
                'file_path': str(self.msg_file_path),
                'subject': self.msg.subject or '',
                'sender': self.msg.sender or '',
                'to': self.msg.to or '',
                'cc': self.msg.cc or '',
                'bcc': self.msg.bcc or '',
                'date': self.msg.date or '',
                'body': self.msg.body or '',
                'html_body': self.msg.htmlBody or '',
                'attachments': [],
                'headers': dict(self.msg.header) if hasattr(self.msg, 'header') else {}
            }
            
            # Extract attachments
            self._extract_attachments()
            
            return self.email_data
            
        except Exception as e:
            print(f"Error reading .msg file: {e}")
            return None
    
    def _extract_attachments(self):
        """
        Extract all attachments from the email.
        """
        try:
            if hasattr(self.msg, 'attachments') and self.msg.attachments:
                for attachment in self.msg.attachments:
                    attachment_data = {
                        'filename': attachment.longFilename or attachment.shortFilename or 'unknown',
                        'size': len(attachment.data) if attachment.data else 0,
                        'content_type': attachment.mimetype or 'application/octet-stream',
                        'data': base64.b64encode(attachment.data).decode('utf-8') if attachment.data else None
                    }
                    self.email_data['attachments'].append(attachment_data)
        except Exception as e:
            print(f"Error extracting attachments: {e}")
    
    def save_attachments(self, output_dir=None):
        """
        Save all attachments to a specified directory.
        
        Args:
            output_dir (str): Directory to save attachments. If None, saves to current directory.
            
        Returns:
            list: List of saved file paths
        """
        if not self.email_data or not self.email_data['attachments']:
            print("No attachments found or email not read yet.")
            return []
        
        if output_dir is None:
            output_dir = Path.cwd() / "extracted_attachments"
        else:
            output_dir = Path(output_dir)
        
        output_dir.mkdir(exist_ok=True)
        saved_files = []
        
        try:
            for i, attachment in enumerate(self.email_data['attachments']):
                if attachment['data']:
                    # Decode the base64 data
                    file_data = base64.b64decode(attachment['data'])
                    
                    # Create safe filename
                    safe_filename = self._sanitize_filename(attachment['filename'])
                    if not safe_filename:
                        safe_filename = f"attachment_{i+1}"
                    
                    file_path = output_dir / safe_filename
                    
                    # Write the file
                    with open(file_path, 'wb') as f:
                        f.write(file_data)
                    
                    saved_files.append(str(file_path))
                    print(f"Saved attachment: {file_path}")
            
            return saved_files
            
        except Exception as e:
            print(f"Error saving attachments: {e}")
            return []
    
    def _sanitize_filename(self, filename):
        """
        Sanitize filename to remove invalid characters.
        
        Args:
            filename (str): Original filename
            
        Returns:
            str: Sanitized filename
        """
        if not filename:
            return None
        
        # Remove or replace invalid characters for both files and directories
        invalid_chars = '<>:"/\\|?*'
        for char in invalid_chars:
            filename = filename.replace(char, '_')
        
        # Additional Windows-specific invalid characters
        invalid_chars_windows = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
        for char in invalid_chars_windows:
            filename = filename.replace(char, '_')
        
        # Remove leading/trailing dots and spaces
        filename = filename.strip('. ')
        
        # Limit length to avoid Windows path length issues
        if len(filename) > 100:
            filename = filename[:100]
        
        return filename.strip()
    
    def get_email_summary(self):
        """
        Get a summary of the email content.
        
        Returns:
            dict: Email summary
        """
        if not self.email_data:
            return None
        
        return {
            'subject': self.email_data['subject'],
            'sender': self.email_data['sender'],
            'date': self.email_data['date'],
            'body_preview': self.email_data['body'][:200] + '...' if len(self.email_data['body']) > 200 else self.email_data['body'],
            'attachment_count': len(self.email_data['attachments']),
            'attachment_names': [att['filename'] for att in self.email_data['attachments']]
        }
    
    def save_email_data(self, output_file=None):
        """
        Save email data to a JSON file.
        
        Args:
            output_file (str): Path to save JSON file. If None, uses email subject as filename.
            
        Returns:
            str: Path to saved file
        """
        if not self.email_data:
            print("No email data to save.")
            return None
        
        if output_file is None:
            safe_subject = self._sanitize_filename(self.email_data['subject'])
            output_file = f"email_data_{safe_subject}.json"
        
        try:
            # Create a copy of email_data without binary data for JSON serialization
            json_data = self.email_data.copy()
            for attachment in json_data['attachments']:
                if 'data' in attachment:
                    del attachment['data']  # Remove binary data for JSON
            
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(json_data, f, indent=2, ensure_ascii=False, default=str)
            
            print(f"Email data saved to: {output_file}")
            return output_file
            
        except Exception as e:
            print(f"Error saving email data: {e}")
            return None

In [12]:
# Utility functions for batch processing
def sanitize_directory_name(name):
    """
    Sanitize a name to be safe for use as a directory name on Windows.
    
    Args:
        name (str): Original name
        
    Returns:
        str: Sanitized directory name
    """
    if not name:
        return "unnamed"
    
    # Replace invalid characters for Windows directory names
    invalid_chars = '<>:"/\\|?*'
    for char in invalid_chars:
        name = name.replace(char, '_')
    
    # Remove leading/trailing dots and spaces
    name = name.strip('. ')
    
    # Limit length
    if len(name) > 100:
        name = name[:100]
    
    # Ensure it's not empty
    if not name:
        name = "unnamed"
    
    return name

def process_all_msg_files(data_directory="Data"):
    """
    Process all .msg files in a directory.
    
    Args:
        data_directory (str): Directory containing .msg files
        
    Returns:
        dict: Dictionary with results for each processed file
    """
    data_path = Path(data_directory)
    results = {}
    
    if not data_path.exists():
        print(f"Directory {data_directory} does not exist.")
        return results
    
    # Find all .msg files
    msg_files = list(data_path.glob("*.msg"))
    
    if not msg_files:
        print(f"No .msg files found in {data_directory}")
        return results
    
    print(f"Found {len(msg_files)} .msg files to process:")
    for msg_file in msg_files:
        print(f"  - {msg_file.name}")
    
    # Process each file
    for msg_file in msg_files:
        print(f"\n{'='*50}")
        print(f"Processing: {msg_file.name}")
        print(f"{'='*50}")
        
        try:
            # Create reader instance
            reader = MSGFileReader(msg_file)
            
            # Read the email
            email_data = reader.read_msg_file()
            
            if email_data:
                # Get summary
                summary = reader.get_email_summary()
                print(f"Subject: {summary['subject']}")
                print(f"Sender: {summary['sender']}")
                print(f"Date: {summary['date']}")
                print(f"Attachments: {summary['attachment_count']}")
                print(f"Attachment names: {summary['attachment_names']}")
                print(f"Body preview: {summary['body_preview']}")
                
                # Save attachments
                if summary['attachment_count'] > 0:
                    # Sanitize the directory name to avoid Windows path issues
                    safe_dir_name = sanitize_directory_name(msg_file.stem)
                    print(f"Original filename: {msg_file.stem}")
                    print(f"Sanitized directory name: {safe_dir_name}")
                    
                    attachment_dir = Path("extracted_attachments") / safe_dir_name
                    print(f"Full attachment directory path: {attachment_dir}")
                    
                    # Ensure the parent directory exists
                    Path("extracted_attachments").mkdir(exist_ok=True)
                    
                    try:
                        saved_files = reader.save_attachments(attachment_dir)
                        print(f"Saved {len(saved_files)} attachments to {attachment_dir}")
                    except Exception as e:
                        print(f"Error saving attachments to {attachment_dir}: {e}")
                        # Try with a simpler directory name
                        simple_dir_name = f"email_{msg_file.stem[:20]}"  # Use first 20 chars
                        simple_dir_name = sanitize_directory_name(simple_dir_name)
                        fallback_dir = Path("extracted_attachments") / simple_dir_name
                        print(f"Trying fallback directory: {fallback_dir}")
                        saved_files = reader.save_attachments(fallback_dir)
                        print(f"Saved {len(saved_files)} attachments to {fallback_dir}")
                
                # Save email data as JSON
                json_file = reader.save_email_data()
                
                results[msg_file.name] = {
                    'success': True,
                    'summary': summary,
                    'attachments_saved': len(saved_files) if summary['attachment_count'] > 0 else 0,
                    'json_file': json_file
                }
            else:
                results[msg_file.name] = {
                    'success': False,
                    'error': 'Failed to read email data'
                }
                
        except Exception as e:
            print(f"Error processing {msg_file.name}: {e}")
            results[msg_file.name] = {
                'success': False,
                'error': str(e)
            }
    
    return results

def get_email_body_text(msg_file_path):
    """
    Simple function to get just the email body text from a .msg file.
    
    Args:
        msg_file_path (str): Path to the .msg file
        
    Returns:
        str: Email body text
    """
    try:
        reader = MSGFileReader(msg_file_path)
        email_data = reader.read_msg_file()
        
        if email_data:
            return email_data['body']
        else:
            return None
    except Exception as e:
        print(f"Error reading email body: {e}")
        return None

def get_attachments_info(msg_file_path):
    """
    Get information about attachments in a .msg file.
    
    Args:
        msg_file_path (str): Path to the .msg file
        
    Returns:
        list: List of attachment information dictionaries
    """
    try:
        reader = MSGFileReader(msg_file_path)
        email_data = reader.read_msg_file()
        
        if email_data and email_data['attachments']:
            return email_data['attachments']
        else:
            return []
    except Exception as e:
        print(f"Error reading attachments: {e}")
        return []

In [13]:
# Test the script with the existing .msg files
print("Testing MSG File Reader with existing files...")
print("="*60)

# Process all .msg files in the Data directory
results = process_all_msg_files("Data")

print("\n" + "="*60)
print("PROCESSING SUMMARY")
print("="*60)

for filename, result in results.items():
    print(f"\nFile: {filename}")
    if result['success']:
        print(f"  ✓ Successfully processed")
        print(f"  Subject: {result['summary']['subject']}")
        print(f"  Attachments: {result['summary']['attachment_count']}")
        print(f"  Attachments saved: {result['attachments_saved']}")
        if result['json_file']:
            print(f"  JSON data saved: {result['json_file']}")
    else:
        print(f"  ✗ Failed: {result['error']}")

Testing MSG File Reader with existing files...
Found 3 .msg files to process:
  - Fac Support - Patel Engineering Teesta Project V- Package 6 - CAR.msg
  - FAC. Offer - GLACIER REFRIGERATION SERVICES CORPORATION OR GLACIER MEGAFRIDGE (FIRE FACULTATIVE OFFER).msg
  - Facultative Agriculture Risk Placement  Cairo 3A (Egypt)  Request for Line Support.msg

Processing: Fac Support - Patel Engineering Teesta Project V- Package 6 - CAR.msg
Subject: Fac Support - Patel Engineering Teesta Project V- Package 6 - CAR
Sender: Elcy Mascarenhas <elcy@uibindia.com>
Date: 2025-09-12 12:27:58+03:00
Attachments: 5
Attachment names: ['image001.png', 'image002.png', 'Patel Engg Teesta  UIB RI Slip 1009.docx', '01 LOA-Teesta-V Package 6 with BOQ (18-07-25).pdf', '2021INS100835-Inspection Report (Teesta-V).pdf']
Body preview: This Message originated from outside Kenya Re, be cautious with links and attachments.

________________________________


Dear Steve,

 

Hope you’re doing well.

 

We are pleased to

In [None]:
# Example usage for individual file processing
print("\n" + "="*60)
print("INDIVIDUAL FILE PROCESSING EXAMPLES")
print("="*60)

# Example 1: Read a specific .msg file
example_file = "Data/FAC. Offer - GLACIER REFRIGERATION SERVICES CORPORATION OR GLACIER MEGAFRIDGE (FIRE FACULTATIVE OFFER).msg"

if Path(example_file).exists():
    print(f"\nExample 1: Processing {Path(example_file).name}")
    print("-" * 50)
    
    # Create reader instance
    reader = MSGFileReader(example_file)
    
    # Read the email
    email_data = reader.read_msg_file()
    
    if email_data:
        # Get email summary
        summary = reader.get_email_summary()
        print(f"Subject: {summary['subject']}")
        print(f"Sender: {summary['sender']}")
        print(f"Date: {summary['date']}")
        print(f"Body: {summary['body_preview']}")
        print(f"Attachments: {summary['attachment_count']}")
        
        # Show attachment details
        if summary['attachment_count'] > 0:
            print(f"Attachment names: {summary['attachment_names']}")
            
            # Get detailed attachment info
            attachments = get_attachments_info(example_file)
            for i, att in enumerate(attachments, 1):
                print(f"  {i}. {att['filename']} ({att['size']} bytes, {att['content_type']})")
    else:
        print("Failed to read email data")
else:
    print(f"File {example_file} not found")

print("\n" + "="*60)
print("USAGE INSTRUCTIONS")
print("="*60)
print("""
To use this script:

1. For processing all .msg files in a directory:
   results = process_all_msg_files("Data")

2. For processing a single file:
   reader = MSGFileReader("path/to/file.msg")
   email_data = reader.read_msg_file()
   
3. To get just the email body:
   body_text = get_email_body_text("path/to/file.msg")
   
4. To get attachment information:
   attachments = get_attachments_info("path/to/file.msg")
   
5. To save attachments:
   reader = MSGFileReader("path/to/file.msg")
   reader.read_msg_file()
   saved_files = reader.save_attachments("output_directory")
   
6. To save email data as JSON:
   reader = MSGFileReader("path/to/file.msg")
   reader.read_msg_file()
   json_file = reader.save_email_data("output.json")
""")


In [None]:
# Test the fixed script with the existing .msg files
print("Testing FIXED MSG File Reader with existing files...")
print("="*60)

# Process all .msg files in the Data directory
results = process_all_msg_files("Data")

print("\n" + "="*60)
print("PROCESSING SUMMARY")
print("="*60)

for filename, result in results.items():
    print(f"\nFile: {filename}")
    if result['success']:
        print(f"  ✓ Successfully processed")
        print(f"  Subject: {result['summary']['subject']}")
        print(f"  Attachments: {result['summary']['attachment_count']}")
        print(f"  Attachments saved: {result['attachments_saved']}")
        if result['json_file']:
            print(f"  JSON data saved: {result['json_file']}")
    else:
        print(f"  ✗ Failed: {result['error']}")


In [None]:
# Debug test - let's test directory creation step by step
print("DEBUG: Testing directory creation...")
print("="*50)

# Test the sanitization function
test_names = [
    "Fac Support - Patel Engineering Teesta Project V- Package 6 - CAR",
    "FAC. Offer - GLACIER REFRIGERATION SERVICES CORPORATION OR GLACIER MEGAFRIDGE (FIRE FACULTATIVE OFFER)",
    "Facultative Agriculture Risk Placement  Cairo 3A (Egypt)  Request for Line Support"
]

for name in test_names:
    sanitized = sanitize_directory_name(name)
    print(f"Original: {name}")
    print(f"Sanitized: {sanitized}")
    print(f"Length: {len(sanitized)}")
    print("-" * 30)

# Test creating directories
print("\nTesting directory creation...")
try:
    # Create base directory
    base_dir = Path("extracted_attachments")
    base_dir.mkdir(exist_ok=True)
    print(f"✓ Created base directory: {base_dir}")
    
    # Test creating a subdirectory
    test_dir = base_dir / "test_email"
    test_dir.mkdir(exist_ok=True)
    print(f"✓ Created test directory: {test_dir}")
    
    # Clean up
    test_dir.rmdir()
    print("✓ Cleaned up test directory")
    
except Exception as e:
    print(f"✗ Error: {e}")

print("\n" + "="*50)


In [None]:
# Alternative approach - use simple numbered directories
def process_all_msg_files_simple(data_directory="Data"):
    """
    Process all .msg files in a directory using simple numbered directories.
    
    Args:
        data_directory (str): Directory containing .msg files
        
    Returns:
        dict: Dictionary with results for each processed file
    """
    data_path = Path(data_directory)
    results = {}
    
    if not data_path.exists():
        print(f"Directory {data_directory} does not exist.")
        return results
    
    # Find all .msg files
    msg_files = list(data_path.glob("*.msg"))
    
    if not msg_files:
        print(f"No .msg files found in {data_directory}")
        return results
    
    print(f"Found {len(msg_files)} .msg files to process:")
    for i, msg_file in enumerate(msg_files, 1):
        print(f"  {i}. {msg_file.name}")
    
    # Create base directory
    base_dir = Path("extracted_attachments")
    base_dir.mkdir(exist_ok=True)
    print(f"Created base directory: {base_dir}")
    
    # Process each file
    for i, msg_file in enumerate(msg_files, 1):
        print(f"\n{'='*50}")
        print(f"Processing {i}/{len(msg_files)}: {msg_file.name}")
        print(f"{'='*50}")
        
        try:
            # Create reader instance
            reader = MSGFileReader(msg_file)
            
            # Read the email
            email_data = reader.read_msg_file()
            
            if email_data:
                # Get summary
                summary = reader.get_email_summary()
                print(f"Subject: {summary['subject']}")
                print(f"Sender: {summary['sender']}")
                print(f"Date: {summary['date']}")
                print(f"Attachments: {summary['attachment_count']}")
                print(f"Attachment names: {summary['attachment_names']}")
                print(f"Body preview: {summary['body_preview']}")
                
                # Save attachments using simple numbered directory
                if summary['attachment_count'] > 0:
                    # Use simple numbered directory name
                    attachment_dir = base_dir / f"email_{i:02d}"
                    print(f"Using attachment directory: {attachment_dir}")
                    
                    try:
                        saved_files = reader.save_attachments(attachment_dir)
                        print(f"✓ Saved {len(saved_files)} attachments to {attachment_dir}")
                    except Exception as e:
                        print(f"✗ Error saving attachments: {e}")
                        saved_files = []
                else:
                    saved_files = []
                
                # Save email data as JSON
                json_file = reader.save_email_data()
                
                results[msg_file.name] = {
                    'success': True,
                    'summary': summary,
                    'attachments_saved': len(saved_files),
                    'json_file': json_file,
                    'attachment_dir': str(attachment_dir) if summary['attachment_count'] > 0 else None
                }
            else:
                results[msg_file.name] = {
                    'success': False,
                    'error': 'Failed to read email data'
                }
                
        except Exception as e:
            print(f"Error processing {msg_file.name}: {e}")
            results[msg_file.name] = {
                'success': False,
                'error': str(e)
            }
    
    return results

# Test the simple approach
print("Testing SIMPLE approach with numbered directories...")
print("="*60)

results = process_all_msg_files_simple("Data")

print("\n" + "="*60)
print("PROCESSING SUMMARY")
print("="*60)

for filename, result in results.items():
    print(f"\nFile: {filename}")
    if result['success']:
        print(f"  ✓ Successfully processed")
        print(f"  Subject: {result['summary']['subject']}")
        print(f"  Attachments: {result['summary']['attachment_count']}")
        print(f"  Attachments saved: {result['attachments_saved']}")
        if result['attachment_dir']:
            print(f"  Attachment directory: {result['attachment_dir']}")
        if result['json_file']:
            print(f"  JSON data saved: {result['json_file']}")
    else:
        print(f"  ✗ Failed: {result['error']}")
