# Universal Serial Number and Event Extractor

This notebook scans CSV, Excel, PDF, PST, and image files in the `data` directory, extracts serial numbers, events, names, addresses, and outputs a CSV report.

In [1]:
# Cell 1: Imports and setup
import os
import re
import subprocess
import shutil

import pandas as pd

from typing import Dict, List, Any, Optional
from collections import defaultdict
import datetime
from datetime import datetime, date

import pytesseract
from PIL import Image
import spacy
import dateparser
import pdfplumber
import glob

import warnings

# nlp = spacy.load('en_core_web_trf')  # Transformer-based English NER
nlp = spacy.load("en_core_web_sm")

In [2]:
# Cell 2: Dataset Template Classes for Standardized Event Processing
# This cell defines the standardized structure for all extracted serial number events


class EventRecord:
    """
    Standardized event record structure for serial number events.
    """

    def __init__(self):
        self.serial_number: str = ""
        self.event_type: str = (
            ""  # Any event type (e.g., "Acquisition", "Disposition", "Transfer", "Return", etc.)
        )
        self.event_date: str = ""  # Date in YYYY-MM-DD format or original format
        self.associated_name: str = ""
        self.associated_address: str = ""
        self.source_file: str = ""
        self.file_created: str = ""  # File creation date in YYYY-MM-DD format
        self.file_modified: str = ""  # File modification date in YYYY-MM-DD format

    def to_dict(self) -> Dict[str, str]:
        """Convert record to dictionary format."""
        return {
            "serial_number": self.serial_number,
            "event_type": self.event_type,
            "event_date": self.event_date,
            "associated_name": self.associated_name,
            "associated_address": self.associated_address,
            "source_file": self.source_file,
            "file_created": self.file_created,
            "file_modified": self.file_modified,
        }


class DatasetTemplate:
    """
    Template class for managing the standardized dataset structure.
    """

    # Column definitions with expected data types
    COLUMN_SCHEMA = {
        "serial_number": "string",
        "event_type": "string",
        "event_date": "string",
        "associated_name": "string",
        "associated_address": "string",
        "source_file": "string",
        "file_created": "string",
        "file_modified": "string",
    }

    # Required columns (all are required for consistency)
    REQUIRED_COLUMNS = list(COLUMN_SCHEMA.keys())

    @staticmethod
    def create_empty_dataframe() -> pd.DataFrame:
        """Create an empty DataFrame with the correct column structure."""
        return pd.DataFrame(columns=DatasetTemplate.REQUIRED_COLUMNS)

    @staticmethod
    def validate_record(record: Dict[str, Any]) -> bool:
        """Validate that a record contains all required fields."""
        # Check all required columns are present
        for col in DatasetTemplate.REQUIRED_COLUMNS:
            if col not in record:
                return False

        # Validate that event_type is not empty (but accept any non-empty string)
        if not record.get("event_type") or str(record.get("event_type")).strip() == "":
            return False

        return True

    @staticmethod
    def create_record(
        serial_number: str,
        event_type: str,
        event_date: str = "",
        associated_name: str = "",
        associated_address: str = "",
        source_file: str = "",
        file_created: str = "",
        file_modified: str = "",
    ) -> EventRecord:
        """Create a standardized event record."""
        record = EventRecord()
        record.serial_number = str(serial_number).strip()
        record.event_type = str(event_type).strip()  # Accept any event type
        record.event_date = str(event_date).strip()
        record.associated_name = str(associated_name).strip()
        record.associated_address = str(associated_address).strip()
        record.source_file = str(source_file).strip()
        record.file_created = str(file_created).strip()
        record.file_modified = str(file_modified).strip()

        return record

    @staticmethod
    def get_file_metadata(filepath: str) -> tuple:
        """Extract file creation and modification dates."""
        try:
            stat = os.stat(filepath)
            created = datetime.fromtimestamp(stat.st_ctime)
            modified = datetime.fromtimestamp(stat.st_mtime)

            # Ensure created date is not later than modified date
            if modified < created:
                created, modified = modified, created

            return created.date().strftime("%Y-%m-%d"), modified.date().strftime(
                "%Y-%m-%d"
            )
        except Exception:
            return "", ""

    @staticmethod
    def combine_address_fields(*address_parts) -> str:
        """Combine multiple address components into a single address string."""
        # Filter out empty/null values and strip whitespace
        clean_parts = []
        for part in address_parts:
            if part and str(part).strip() and str(part).strip().lower() != "nan":
                clean_parts.append(str(part).strip())

        # Join with commas and clean up formatting
        address = ", ".join(clean_parts)
        return address.replace(" ,", "").strip(", ")

    @staticmethod
    def standardize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
        """Standardize a DataFrame to match the template structure."""
        # Ensure all required columns exist
        for col in DatasetTemplate.REQUIRED_COLUMNS:
            if col not in df.columns:
                df[col] = ""

        # Select only the required columns in the correct order
        df = df[DatasetTemplate.REQUIRED_COLUMNS].copy()

        # Convert all columns to string type and strip whitespace
        for col in df.columns:
            df[col] = df[col].astype(str).str.strip()

        # Remove exact duplicates
        df = df.drop_duplicates()

        return df

    @staticmethod
    def save_to_csv(df: pd.DataFrame, filepath: str) -> None:
        """Save DataFrame to CSV with standardized formatting."""
        # Ensure directory exists
        os.makedirs(os.path.dirname(filepath), exist_ok=True)

        # Standardize before saving
        standardized_df = DatasetTemplate.standardize_dataframe(df)

        # Save to CSV
        standardized_df.to_csv(filepath, index=False)
        print(f"Saved {len(standardized_df)} records to {filepath}")

    @staticmethod
    def get_unique_event_types(df: pd.DataFrame) -> List[str]:
        """Get all unique event types found in the dataset."""
        if "event_type" in df.columns:
            return sorted(df["event_type"].dropna().unique().tolist())
        return []


print("Dataset template classes loaded successfully!")
print(f"Required columns: {DatasetTemplate.REQUIRED_COLUMNS}")
print("Event types: Dynamic - any non-empty string accepted")

Dataset template classes loaded successfully!
Required columns: ['serial_number', 'event_type', 'event_date', 'associated_name', 'associated_address', 'source_file', 'file_created', 'file_modified']
Event types: Dynamic - any non-empty string accepted


In [3]:
# Cell 3: Create reports directory and Serial Number Extractor

if not os.path.exists("reports"):
    os.makedirs("reports")
    print("Created 'reports' directory.")


# Serial Number Extractor
class SerialNumberExtractor:
    def __init__(self):
        self.patterns = [
            r"\b[A-Z0-9]{6,12}US\b",
            r"\b[A-Z]{4}[0-9]{3}\b",
            r"\b[A-Z]{4}[0-9]{3,4}\b",
            r"\b[A-Z]{3}[0-9]{3,4}\b",
            r"\b[0-9][A-Z]{3}[0-9]{3}\b",
            r"\b[A-Z]{2,4}[0-9]{4,8}\b",
            r"\b[0-9]{4,12}[A-Z]{1,4}\b",
            r"\b[A-Z][0-9]{4,8}[A-Z]\b",
            r"\b[0-9]{6,12}\b",
            r"\b[A-Z0-9]{2,4}[-_][A-Z0-9]{2,8}\b",
            r"\b[A-Z0-9]{3,8}[-_][0-9]{2,6}\b",
        ]
        self.compiled_patterns = [re.compile(pattern) for pattern in self.patterns]
        self.exclude_patterns = [
            r"\b[0-9]{4}[/-][0-9]{1,2}[/-][0-9]{1,2}\b",
            r"\b[0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4}\b",
            r"\bZIP\b",
            r"\bFFL\b",
            r"\bATF\b",
            r"\bPDF\b",
            r"\b(PAGE|FORM|SECTION|ITEM|LINE)\b",
        ]
        self.exclude_compiled = [
            re.compile(pattern, re.IGNORECASE) for pattern in self.exclude_patterns
        ]

    def extract(self, text):
        serials = set()
        for pattern in self.compiled_patterns:
            for match in pattern.findall(text):
                if not any(excl.search(match) for excl in self.exclude_compiled):
                    serials.add(match)
        return list(serials)

In [4]:
# Cell 4: File Discovery and Analysis
# This cell scans the data directory to identify all available source files


def discover_source_files(data_dir="data"):
    """
    Discover and analyze all source files in the data directory.
    """
    if not os.path.exists(data_dir):
        print(f"Data directory '{data_dir}' not found!")
        return

    print(f"Scanning directory: {data_dir}")
    print("=" * 60)

    # File extensions to look for
    target_extensions = {
        "csv": "CSV Files",
        "xlsx": "Excel Files (xlsx)",
        "xls": "Excel Files (xls)",
        "pdf": "PDF Files",
        "pst": "PST Files",
        "jpg": "Image Files (jpg)",
        "jpeg": "Image Files (jpeg)",
        "png": "Image Files (png)",
        "tiff": "Image Files (tiff)",
        "tif": "Image Files (tif)",
    }

    # Discover files
    file_stats = defaultdict(list)
    total_files = 0

    for ext, description in target_extensions.items():
        pattern = os.path.join(data_dir, "**", f"*.{ext}")
        files = glob.glob(pattern, recursive=True)

        if files:
            file_stats[description] = files
            total_files += len(files)

    # Display results
    print(f"Total files found: {total_files}")
    print()

    if total_files == 0:
        print("No supported files found in the data directory!")
        print(
            "Supported formats: CSV, Excel (xlsx/xls), PDF, PST, Images (jpg/jpeg/png/tiff/tif)"
        )
        return

    # Show breakdown by file type
    for file_type, files in file_stats.items():
        print(f"{file_type}: {len(files)} files")
        for file_path in sorted(files):
            # Get file size
            try:
                size = os.path.getsize(file_path)
                size_str = format_file_size(size)
                rel_path = os.path.relpath(file_path, data_dir)
                print(f"  {rel_path} ({size_str})")
            except Exception as e:
                rel_path = os.path.relpath(file_path, data_dir)
                print(f"  {rel_path} (size unknown)")
        print()

    # Summary statistics
    print("SUMMARY:")
    print("-" * 30)
    for file_type, files in sorted(file_stats.items()):
        print(f"{file_type}: {len(files)}")

    return file_stats


def format_file_size(size_bytes):
    """Convert bytes to human readable format."""
    if size_bytes == 0:
        return "0 B"

    size_names = ["B", "KB", "MB", "GB"]
    i = 0
    while size_bytes >= 1024 and i < len(size_names) - 1:
        size_bytes /= 1024.0
        i += 1

    return f"{size_bytes:.1f} {size_names[i]}"


# Run the discovery
discovered_files = discover_source_files()

# Store results for later use
if discovered_files:
    print("\nFile discovery complete! Results stored in 'discovered_files' variable.")
    print("Ready to proceed with processing these files.")
else:
    print("\nNo files found. Please check your data directory.")

Scanning directory: data
Total files found: 52

CSV Files: 1 files
  AUS Weapons List.csv (3.2 MB)

Excel Files (xlsx): 20 files
  2021 Central Inventory Form.xlsx (251.0 KB)
  2022 Weapon Inventory Form.xlsx (9.0 MB)
  4-29-2021 IN-SAFE 4144 Weapons Inventory.xlsx (24.2 KB)
  ABSS FFL A&D Log 06_19_2019 with location.xlsx (556.4 KB)
  AUS FFL Workbook 06_02_22 JCG.xlsx (1.2 MB)
  AUS-USSA Firearm Inventory.xlsx (508.6 KB)
  AUS-USSA Firearm Workbook.xlsx (466.7 KB)
  CE Region Inv Report7_19_23.xlsx (56.9 KB)
  Central AUS Inventory.xlsx (144.8 KB)
  Copy of 20250320 - Firearm audit worksheet.xlsx (614.7 KB)
  Dallas Branch 600+.xlsx (22.5 KB)
  Dallas Workbook.xlsx (645.9 KB)
  Expected FA.xlsx (161.8 KB)
  FFL Workbook 6_3_20 to ND.xlsx (826.9 KB)
  Master Certification List MAC45.xlsx (70.5 KB)
  Murray Weapons (2).xlsx (82.4 KB)
  New AUS FFL A&D Log 07_03_2019 with location.xlsx (266.0 KB)
  Unassigned Orchid Fireams as of 1300CST 03172025.xlsx (131.6 KB)
  Weapons Database - Are

In [5]:
# Cell 5: PST Attachment Discovery and EML Processing
# This cell discovers all PST attachments and processes EML files to extract content and embedded attachments

import email
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import mimetypes


def discover_and_process_pst_attachments(discovered_files, temp_dir="temp_attachments"):
    """
    Discover all PST attachments and process EML files for content and embedded attachments.
    """
    if "PST Files" not in discovered_files:
        print("No PST files found.")
        return discovered_files, []

    print("DISCOVERING AND PROCESSING PST ATTACHMENTS")
    print("=" * 60)

    # Use the readpst command we know works
    readpst_command = "/opt/homebrew/bin/readpst"

    all_attachments = []
    email_content = []
    extracted_attachments = []

    for pst_path in discovered_files["PST Files"]:
        print(f"\nAnalyzing PST: {os.path.basename(pst_path)}")

        # Create temporary extraction directory
        readpst_temp = os.path.join(temp_dir, "discovery_temp")
        if os.path.exists(readpst_temp):
            shutil.rmtree(readpst_temp)
        os.makedirs(readpst_temp)

        try:
            # Extract everything from PST
            cmd = [readpst_command, "-e", "-o", readpst_temp, pst_path]
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)

            if result.returncode != 0:
                print(f"Error extracting PST: {result.stderr}")
                continue

            # Scan all extracted files
            pst_attachments = []
            eml_files = []

            for root, dirs, files in os.walk(readpst_temp):
                for file in files:
                    file_path = os.path.join(root, file)
                    file_size = os.path.getsize(file_path)

                    # Get file extension
                    if "." in file:
                        ext = file.split(".")[-1].lower()
                    else:
                        ext = "no_extension"

                    attachment_info = {
                        "filename": file,
                        "extension": ext,
                        "size_bytes": file_size,
                        "size_readable": format_file_size(file_size),
                        "path": file_path,
                        "pst_source": os.path.basename(pst_path),
                    }
                    pst_attachments.append(attachment_info)
                    all_attachments.append(attachment_info)

                    # Collect EML files for processing
                    if ext == "eml":
                        eml_files.append(file_path)

            # Display attachments from this PST
            if pst_attachments:
                print(f"Found {len(pst_attachments)} files:")

                # Group by extension
                ext_groups = {}
                for att in pst_attachments:
                    ext = att["extension"]
                    if ext not in ext_groups:
                        ext_groups[ext] = []
                    ext_groups[ext].append(att)

                for ext, files in sorted(ext_groups.items()):
                    print(f"  .{ext} files: {len(files)}")

            # Process EML files from this PST
            if eml_files:
                print(
                    f"\nProcessing {len(eml_files)} EML files for embedded attachments..."
                )

                for eml_path in eml_files:
                    try:
                        # Parse EML file
                        with open(eml_path, "rb") as f:
                            msg = email.message_from_bytes(f.read())

                        # Extract email metadata
                        subject = msg.get("Subject", "No Subject")
                        sender = msg.get("From", "Unknown Sender")
                        recipient = msg.get("To", "Unknown Recipient")
                        date = msg.get("Date", "Unknown Date")

                        # Extract email body text
                        body_text = ""
                        if msg.is_multipart():
                            for part in msg.walk():
                                if part.get_content_type() == "text/plain":
                                    try:
                                        body_text += part.get_payload(
                                            decode=True
                                        ).decode("utf-8", errors="ignore")
                                    except:
                                        pass
                        else:
                            if msg.get_content_type() == "text/plain":
                                try:
                                    body_text = msg.get_payload(decode=True).decode(
                                        "utf-8", errors="ignore"
                                    )
                                except:
                                    pass

                        # Store email content for serial number extraction
                        if body_text.strip():
                            email_content.append(
                                {
                                    "filename": os.path.basename(eml_path),
                                    "subject": subject,
                                    "sender": sender,
                                    "recipient": recipient,
                                    "date": date,
                                    "body": body_text,
                                    "source_file": eml_path,
                                    "pst_source": os.path.basename(pst_path),
                                }
                            )

                        # Extract attachments from email
                        if msg.is_multipart():
                            for part in msg.walk():
                                # Skip text parts
                                if part.get_content_maintype() == "multipart":
                                    continue
                                if part.get("Content-Disposition") is None:
                                    continue

                                # Get attachment filename
                                filename = part.get_filename()
                                if filename:
                                    # Decode filename if needed
                                    if filename.startswith("=?"):
                                        try:
                                            filename = email.header.decode_header(
                                                filename
                                            )[0][0]
                                            if isinstance(filename, bytes):
                                                filename = filename.decode(
                                                    "utf-8", errors="ignore"
                                                )
                                        except:
                                            pass

                                    # Save attachment
                                    timestamp = datetime.now().strftime(
                                        "%Y%m%d_%H%M%S_%f"
                                    )
                                    safe_filename = f"{timestamp}_{filename}"
                                    attachment_path = os.path.join(
                                        temp_dir, safe_filename
                                    )

                                    try:
                                        with open(attachment_path, "wb") as f:
                                            f.write(part.get_payload(decode=True))

                                        extracted_attachments.append(
                                            {
                                                "original_name": filename,
                                                "saved_path": attachment_path,
                                                "extension": (
                                                    filename.split(".")[-1].lower()
                                                    if "." in filename
                                                    else "no_ext"
                                                ),
                                                "source_email": os.path.basename(
                                                    eml_path
                                                ),
                                                "pst_source": os.path.basename(
                                                    pst_path
                                                ),
                                            }
                                        )
                                    except Exception as e:
                                        print(
                                            f"    Error saving attachment {filename}: {str(e)}"
                                        )

                    except Exception as e:
                        print(
                            f"  Error processing EML {os.path.basename(eml_path)}: {str(e)}"
                        )
                        continue

            # Clean up temp directory
            shutil.rmtree(readpst_temp)

        except Exception as e:
            print(f"Error processing {pst_path}: {str(e)}")
            continue

    # Summary of all discoveries
    print(f"\n" + "=" * 60)
    print(f"PST PROCESSING SUMMARY")
    print("=" * 60)

    if all_attachments:
        # Group all attachments by extension
        ext_summary = {}
        for att in all_attachments:
            ext = att["extension"]
            if ext not in ext_summary:
                ext_summary[ext] = {"count": 0, "total_size": 0}
            ext_summary[ext]["count"] += 1
            ext_summary[ext]["total_size"] += att["size_bytes"]

        print(f"Found {len(all_attachments)} total files in PST:")
        for ext, info in sorted(ext_summary.items()):
            total_size_readable = format_file_size(info["total_size"])
            print(f"  .{ext}: {info['count']} files ({total_size_readable})")

    if email_content:
        print(f"\nProcessed {len(email_content)} emails with text content")

    if extracted_attachments:
        print(
            f"Extracted {len(extracted_attachments)} embedded attachments from emails"
        )

        # Group extracted attachments by type
        att_by_ext = {}
        for att in extracted_attachments:
            ext = att["extension"]
            if ext not in att_by_ext:
                att_by_ext[ext] = 0
            att_by_ext[ext] += 1

        print("\nExtracted attachment types:")
        for ext, count in sorted(att_by_ext.items()):
            print(f"  .{ext}: {count} files")

        # Add extracted attachments to discovered_files
        attachment_mapping = {
            "csv": "CSV Files",
            "xlsx": "Excel Files (xlsx)",
            "xls": "Excel Files (xls)",
            "pdf": "PDF Files",
            "jpg": "Image Files (jpg)",
            "jpeg": "Image Files (jpeg)",
            "png": "Image Files (png)",
            "tiff": "Image Files (tiff)",
            "tif": "Image Files (tif)",
            "txt": "Text Files",
        }

        added_files = 0
        for att in extracted_attachments:
            ext = att["extension"]
            if ext in attachment_mapping:
                file_type = attachment_mapping[ext]
                if file_type not in discovered_files:
                    discovered_files[file_type] = []
                discovered_files[file_type].append(att["saved_path"])
                added_files += 1

        if added_files > 0:
            print(
                f"\nAdded {added_files} extracted attachments to file inventory for processing"
            )

    return discovered_files, email_content


def format_file_size(size_bytes):
    """Convert bytes to human readable format."""
    if size_bytes == 0:
        return "0 B"

    size_names = ["B", "KB", "MB", "GB"]
    i = 0
    while size_bytes >= 1024 and i < len(size_names) - 1:
        size_bytes /= 1024.0
        i += 1

    return f"{size_bytes:.1f} {size_names[i]}"


# Run PST discovery and processing
if "discovered_files" in globals() and discovered_files:
    discovered_files, email_texts = discover_and_process_pst_attachments(
        discovered_files
    )

    # Display updated file counts
    print("\nUpdated file inventory after PST processing:")
    print("-" * 50)
    total_files = 0
    for file_type, files in sorted(discovered_files.items()):
        print(f"{file_type}: {len(files)}")
        total_files += len(files)
    print(f"Total files: {total_files}")

    if email_texts:
        print(
            f"\nEmail content available for text-based serial number extraction: {len(email_texts)} emails"
        )
else:
    print("No discovered_files found. Run file discovery first.")

DISCOVERING AND PROCESSING PST ATTACHMENTS

Analyzing PST: Export_20250626141845886_Part_1.pst
Found 343 files:
  .eml files: 343

Processing 343 EML files for embedded attachments...
    Error saving attachment details.txt: a bytes-like object is required, not 'NoneType'

PST PROCESSING SUMMARY
Found 343 total files in PST:
  .eml: 343 files (1008.0 MB)

Processed 340 emails with text content
Extracted 1315 embedded attachments from emails

Extracted attachment types:
  .csv: 12 files
  .doc: 5 files
  .docx: 13 files
  .htm: 7 files
  .jpeg: 2 files
  .jpg: 97 files
  .pdf: 193 files
  .png: 630 files
  .rtf: 1 files
  .xls: 28 files
  .xlsx: 325 files
  .zip: 2 files

Added 1287 extracted attachments to file inventory for processing

Updated file inventory after PST processing:
--------------------------------------------------
CSV Files: 13
Excel Files (xls): 33
Excel Files (xlsx): 345
Image Files (jpeg): 3
Image Files (jpg): 101
Image Files (png): 630
PDF Files: 213
PST Files: 1
T

In [6]:
# Cell 6: CSV File Processing with DatasetTemplate
# This cell processes all CSV files using the standardized dataset template

import tempfile
import os


def preprocess_csv_for_alignment(csv_path):
    """
    Preprocess CSV file to fix misaligned rows caused by names with Jr/Sr/etc.
    """
    # Read the file as text lines first
    with open(csv_path, "r", encoding="utf-8", errors="ignore") as file:
        lines = file.readlines()

    if not lines:
        return csv_path

    # Get the expected number of columns from the header
    header_cols = lines[0].count(",") + 1

    # Process each line to fix misalignment
    fixed_lines = [lines[0]]  # Keep header as is

    for i, line in enumerate(lines[1:], 1):
        # Count columns in this line
        line_cols = line.count(",") + 1

        if line_cols == header_cols:
            # Line is properly aligned, keep as is
            fixed_lines.append(line)
        elif line_cols > header_cols:
            # Line has extra columns, likely due to split names
            # Try to merge the extra columns back into the appropriate fields
            parts = line.strip().split(",")

            # Look for common name suffixes that might be split
            for j in range(len(parts) - 1, 0, -1):
                if parts[j].strip().lower() in ["jr", "sr", "ii", "iii", "iv"]:
                    # Merge this with the previous part
                    parts[j - 1] = parts[j - 1].strip() + " " + parts[j].strip()
                    parts.pop(j)
                    break

            # If we still have too many columns, join the extras to the last meaningful field
            while len(parts) > header_cols and len(parts) > 1:
                # Join extra columns to the last field that might contain names
                parts[-2] = parts[-2].strip() + " " + parts[-1].strip()
                parts.pop()

            # Reconstruct the line
            fixed_line = ",".join(parts) + "\n"
            fixed_lines.append(fixed_line)
        else:
            # Line has fewer columns, pad with empty fields
            parts = line.strip().split(",")
            while len(parts) < header_cols:
                parts.append("")
            fixed_line = ",".join(parts) + "\n"
            fixed_lines.append(fixed_line)

    # Write fixed content to a temporary file
    temp_file = tempfile.NamedTemporaryFile(
        mode="w", delete=False, suffix=".csv", encoding="utf-8"
    )
    temp_file.writelines(fixed_lines)
    temp_file.close()

    return temp_file.name


def process_csv_files(discovered_files):
    """
    Process all CSV files using the standardized DatasetTemplate.
    """
    if "CSV Files" not in discovered_files:
        print("No CSV files found for processing.")
        return []

    print("PROCESSING CSV FILES")
    print("=" * 50)

    csv_files = discovered_files["CSV Files"]
    print(f"Found {len(csv_files)} CSV files to process")

    all_csv_records = []
    processed_files = 0
    skipped_files = 0

    for csv_path in csv_files:
        print(f"\nProcessing: {os.path.basename(csv_path)}")

        try:
            # Preprocess to fix alignment issues
            fixed_csv_path = preprocess_csv_for_alignment(csv_path)

            # Read the fixed CSV file
            df = pd.read_csv(fixed_csv_path, dtype=str, encoding="utf-8")
            print(f"  Loaded {len(df)} rows, {len(df.columns)} columns")

            # Clean up temporary file
            if fixed_csv_path != csv_path:
                os.unlink(fixed_csv_path)

        except Exception as e:
            print(f"  Error reading file: {e}")
            skipped_files += 1
            continue

        # Check for Serial Number column
        if "Serial Number" not in df.columns:
            print(f"  Skipping: no 'Serial Number' column found")
            print(f"  Available columns: {list(df.columns)}")
            skipped_files += 1
            continue

        # Get file metadata
        file_created, file_modified = DatasetTemplate.get_file_metadata(csv_path)
        source_filename = os.path.basename(csv_path)

        # Standardize serial number column
        df["Serial Number"] = df["Serial Number"].astype(str).str.strip()

        row_count = 0
        event_count = 0

        for idx, row in df.iterrows():
            serial = str(row.get("Serial Number", "")).strip()
            if not serial or serial.lower() == "nan":
                continue

            row_count += 1

            # Process Acquisition event
            if (
                pd.notnull(row.get("Acquire Date"))
                and str(row.get("Acquire Date")).strip()
            ):
                # Combine address fields
                address = DatasetTemplate.combine_address_fields(
                    row.get("Acquire Address", ""),
                    row.get("Acquire City", ""),
                    row.get("Acquire State", ""),
                    row.get("Acquire Zip", ""),
                )

                # Create standardized record
                record = DatasetTemplate.create_record(
                    serial_number=serial,
                    event_type="Acquisition",
                    event_date=str(row.get("Acquire Date", "")),
                    associated_name=str(row.get("Acquire Name", "")),
                    associated_address=address,
                    source_file=source_filename,
                    file_created=file_created,
                    file_modified=file_modified,
                )

                all_csv_records.append(record.to_dict())
                event_count += 1

            # Process Disposition event
            if (
                pd.notnull(row.get("Disposition Date"))
                and str(row.get("Disposition Date")).strip()
            ):
                # Combine address fields
                address = DatasetTemplate.combine_address_fields(
                    row.get("Disposition Address", ""),
                    row.get("Disposition City", ""),
                    row.get("Disposition State", ""),
                    row.get("Disposition Zip", ""),
                )

                # Create standardized record
                record = DatasetTemplate.create_record(
                    serial_number=serial,
                    event_type="Disposition",
                    event_date=str(row.get("Disposition Date", "")),
                    associated_name=str(row.get("Disposition Name", "")),
                    associated_address=address,
                    source_file=source_filename,
                    file_created=file_created,
                    file_modified=file_modified,
                )

                all_csv_records.append(record.to_dict())
                event_count += 1

        print(f"  Processed {row_count} rows → {event_count} events")
        processed_files += 1

    # Create standardized DataFrame
    if all_csv_records:
        csv_df = pd.DataFrame(all_csv_records)
        csv_df = DatasetTemplate.standardize_dataframe(csv_df)

        print(f"\n" + "=" * 50)
        print(f"CSV PROCESSING SUMMARY")
        print("=" * 50)
        print(f"Files processed: {processed_files}")
        print(f"Files skipped: {skipped_files}")
        print(f"Total events extracted: {len(csv_df)}")
        print(f"Unique serial numbers: {csv_df['serial_number'].nunique()}")

        # Show event type breakdown
        event_types = csv_df["event_type"].value_counts()
        print(f"\nEvent types found:")
        for event_type, count in event_types.items():
            print(f"  {event_type}: {count}")

        # Find serial numbers with multiple events
        serial_counts = csv_df["serial_number"].value_counts()
        multi_event_serials = serial_counts[serial_counts > 1]

        if len(multi_event_serials) > 0:
            print(f"\nSerial numbers with multiple events: {len(multi_event_serials)}")
            print("Top 5 serial numbers by event count:")
            for serial, count in multi_event_serials.head().items():
                print(f"  {serial}: {count} events")

            # Show sample of multi-event records
            sample_serial = multi_event_serials.index[0]
            sample_records = csv_df[
                csv_df["serial_number"] == sample_serial
            ].sort_values("event_date")
            print(f"\nSample records for serial {sample_serial}:")
            for _, record in sample_records.iterrows():
                print(
                    f"  {record['event_date']} - {record['event_type']} - {record['associated_name']}"
                )

        # Save results
        DatasetTemplate.save_to_csv(csv_df, "reports/csv_events_standardized.csv")

        # Save multi-event records if they exist
        if len(multi_event_serials) > 0:
            multi_event_df = csv_df[
                csv_df["serial_number"].isin(multi_event_serials.index)
            ]
            multi_event_df = multi_event_df.sort_values(
                ["serial_number", "event_date", "event_type"]
            )
            DatasetTemplate.save_to_csv(
                multi_event_df, "reports/csv_multi_event_standardized.csv"
            )
            print(f"Saved multi-event records: {len(multi_event_df)} records")

        return csv_df
    else:
        print("\nNo CSV records were extracted.")
        return pd.DataFrame()


# Run CSV processing
if "discovered_files" in globals() and discovered_files:
    csv_results = process_csv_files(discovered_files)

    if not csv_results.empty:
        print(
            f"\n✓ CSV processing complete! {len(csv_results)} standardized records created."
        )
        print("Files saved:")
        print("  - reports/csv_events_standardized.csv (all events)")
        if (
            len(
                csv_results["serial_number"].value_counts()[
                    csv_results["serial_number"].value_counts() > 1
                ]
            )
            > 0
        ):
            print("  - reports/csv_multi_event_standardized.csv (multi-event serials)")
    else:
        print("\n✓ CSV processing complete - no valid records found.")
else:
    print("No discovered_files found. Run file discovery first.")

PROCESSING CSV FILES
Found 13 CSV files to process

Processing: AUS Weapons List.csv
  Loaded 19190 rows, 42 columns
  Processed 19190 rows → 20501 events

Processing: 20250801_071752_173946_regionInventoryReport (36).csv
  Loaded 20375 rows, 6 columns
  Processed 20375 rows → 0 events

Processing: 20250801_071753_217501_AUS Weapons List.csv
  Loaded 19190 rows, 42 columns
  Processed 19190 rows → 20501 events

Processing: 20250801_071753_391257_regionInventoryReport (7).csv
  Loaded 19553 rows, 6 columns
  Processed 19553 rows → 0 events

Processing: 20250801_071754_795836_Bin Report.csv
  Loaded 51 rows, 4 columns
  Processed 51 rows → 0 events

Processing: 20250801_071756_477557_regionInventoryReport (2).csv
  Loaded 19563 rows, 6 columns
  Processed 19563 rows → 0 events

Processing: 20250801_071756_913944_branchAssignmentReport (4).csv
  Loaded 816 rows, 6 columns
  Processed 816 rows → 0 events

Processing: 20250801_071757_741562_All AUS 3-21-25 1600EST.csv
  Loaded 20087 rows, 7

In [7]:
# Cell 7: Excel File Processing with DatasetTemplate
# This cell processes all Excel files using the standardized dataset template


def process_excel_files(discovered_files):
    """
    Process all Excel files using the standardized DatasetTemplate.
    """
    # Get Excel files from both .xlsx and .xls keys
    excel_files = []

    if "Excel Files (xlsx)" in discovered_files:
        excel_files.extend(discovered_files["Excel Files (xlsx)"])
        print(f"Found {len(discovered_files['Excel Files (xlsx)'])} .xlsx files")

    if "Excel Files (xls)" in discovered_files:
        excel_files.extend(discovered_files["Excel Files (xls)"])
        print(f"Found {len(discovered_files['Excel Files (xls)'])} .xls files")

    if not excel_files:
        print("No Excel files found for processing.")
        return pd.DataFrame()

    print("PROCESSING EXCEL FILES")
    print("=" * 50)
    print(f"Total Excel files to process: {len(excel_files)}")

    all_excel_records = []
    processed_files = 0
    skipped_files = 0
    total_sheets = 0
    processed_sheets = 0

    # Suppress openpyxl warnings
    warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")

    # Header detection keywords
    HEADER_KEYWORDS = [
        "serial",
        "date",
        "acquisition",
        "disposition",
        "inspection",
        "transfer",
        "inventory",
        "name",
        "address",
        "model",
    ]

    def find_header_row(df_head):
        """Scans the top rows of a DataFrame to find the most likely header row."""
        best_score = 0
        header_row_index = -1
        for i, row in df_head.iterrows():
            score = 0
            for cell in row:
                if isinstance(cell, str):
                    for keyword in HEADER_KEYWORDS:
                        if keyword in cell.lower():
                            score += 1
            if score > best_score:
                best_score = score
                header_row_index = i
        return header_row_index if best_score >= 2 else -1

    def process_excel_sheet_with_header(df, sheet_info):
        """Process Excel sheet that has identifiable headers."""
        sheet_records = []
        source_file, sheet_name, file_created, file_modified = sheet_info

        # Ensure all columns are strings
        df.columns = [str(c) for c in df.columns]

        # Find serial number column
        serial_col = next((col for col in df.columns if "serial" in col.lower()), None)
        if not serial_col:
            return []

        # Find name columns (handle split names)
        first_name_col = next(
            (c for c in df.columns if "first" in c.lower() and "name" in c.lower()),
            None,
        )
        last_name_col = next(
            (c for c in df.columns if "last" in c.lower() and "name" in c.lower()), None
        )
        middle_initial_col = next(
            (c for c in df.columns if "middle" in c.lower() or c.lower() == "mi"), None
        )
        full_name_col = next(
            (
                c
                for c in df.columns
                if "name" in c.lower()
                and "first" not in c.lower()
                and "last" not in c.lower()
            ),
            None,
        )

        # Find address columns
        address_col = next((c for c in df.columns if "address" in c.lower()), None)
        city_col = next((c for c in df.columns if "city" in c.lower()), None)
        state_col = next((c for c in df.columns if "state" in c.lower()), None)
        zip_col = next(
            (c for c in df.columns if "zip" in c.lower() or "postal" in c.lower()), None
        )

        # Find date columns (excluding expiration dates)
        IGNORE_IN_DATE = ["exp", "expir"]
        date_columns = [
            col
            for col in df.columns
            if "date" in col.lower()
            and not any(word in col.lower() for word in IGNORE_IN_DATE)
        ]

        for idx, row in df.iterrows():
            serial = str(row.get(serial_col, "")).strip()
            if (
                not serial
                or serial.lower() == "nan"
                or not any(c.isalnum() for c in serial)
            ):
                continue

            # Combine name parts
            associated_name = ""
            if first_name_col and last_name_col:
                # Handle split name columns
                name_parts = []
                first = str(row.get(first_name_col, "")).strip()
                middle = (
                    str(row.get(middle_initial_col, "")).strip()
                    if middle_initial_col
                    else ""
                )
                last = str(row.get(last_name_col, "")).strip()

                if first and first.lower() != "nan":
                    name_parts.append(first)
                if middle and middle.lower() != "nan":
                    name_parts.append(middle)
                if last and last.lower() != "nan":
                    name_parts.append(last)

                associated_name = " ".join(name_parts)
            elif full_name_col:
                # Single name column
                associated_name = str(row.get(full_name_col, "")).strip()
                if associated_name.lower() == "nan":
                    associated_name = ""

            # Combine address fields
            if address_col or city_col or state_col or zip_col:
                address = DatasetTemplate.combine_address_fields(
                    row.get(address_col, "") if address_col else "",
                    row.get(city_col, "") if city_col else "",
                    row.get(state_col, "") if state_col else "",
                    row.get(zip_col, "") if zip_col else "",
                )
            else:
                address = ""

            # Process each date column as separate events
            events_created = 0
            for date_col in date_columns:
                if pd.notnull(row.get(date_col)) and str(row.get(date_col)).strip():
                    # Determine event type from column name
                    event_type = date_col.replace("Date", "").replace("of", "").strip()
                    if not event_type:
                        event_type = "Event"

                    # Create standardized record
                    record = DatasetTemplate.create_record(
                        serial_number=serial,
                        event_type=event_type,
                        event_date=str(row.get(date_col, "")),
                        associated_name=associated_name,
                        associated_address=address,
                        source_file=f"{source_file} ({sheet_name})",
                        file_created=file_created,
                        file_modified=file_modified,
                    )

                    sheet_records.append(record.to_dict())
                    events_created += 1

            # If no date columns found, create inventory record
            if not date_columns or events_created == 0:
                record = DatasetTemplate.create_record(
                    serial_number=serial,
                    event_type="Inventory",
                    event_date="",
                    associated_name=associated_name,
                    associated_address=address,
                    source_file=f"{source_file} ({sheet_name})",
                    file_created=file_created,
                    file_modified=file_modified,
                )

                sheet_records.append(record.to_dict())

        return sheet_records

    def process_excel_sheet_without_header(df, sheet_info):
        """Process Excel sheet by inferring column content when no header is found."""
        sheet_records = []
        source_file, sheet_name, file_created, file_modified = sheet_info

        # Score columns for serial numbers and dates
        col_scores = {col: {"serial": 0, "date": 0} for col in df.columns}
        total_rows = len(df)

        if total_rows == 0:
            return []

        # Sample data to infer column types
        for col in df.columns:
            sample = df[col].dropna()
            if len(sample) > 0:
                sample = sample.sample(n=min(50, len(sample)))
                for cell in sample:
                    # Check for serial number patterns
                    if isinstance(cell, str) and re.match(
                        r"^[A-Z0-9-]{5,20}$", cell, re.IGNORECASE
                    ):
                        col_scores[col]["serial"] += 1
                    # Check for date patterns
                    if pd.to_datetime(cell, errors="coerce") is not pd.NaT:
                        col_scores[col]["date"] += 1

        # Find best serial and date columns
        serial_col = (
            max(col_scores, key=lambda c: col_scores[c]["serial"])
            if col_scores
            else None
        )
        date_col = (
            max(col_scores, key=lambda c: col_scores[c]["date"]) if col_scores else None
        )

        # Validate column reliability
        is_serial_col_valid = (
            serial_col is not None
            and col_scores[serial_col]["serial"] / min(50, total_rows) > 0.2
        )
        is_date_col_valid = (
            date_col is not None
            and col_scores[date_col]["date"] / min(50, total_rows) > 0.2
        )

        if not is_serial_col_valid:
            return []

        # Process rows
        for idx, row in df.iterrows():
            serial = str(row.get(serial_col, "")).strip()
            if not serial or not any(c.isalnum() for c in serial):
                continue

            event_date = str(row.get(date_col, "")) if is_date_col_valid else ""
            event_type = "Inferred Event" if is_date_col_valid else "Inventory"

            # Create standardized record
            record = DatasetTemplate.create_record(
                serial_number=serial,
                event_type=event_type,
                event_date=event_date,
                associated_name="",
                associated_address="",
                source_file=f"{source_file} ({sheet_name})",
                file_created=file_created,
                file_modified=file_modified,
            )

            sheet_records.append(record.to_dict())

        return sheet_records

    # Process each Excel file
    for excel_path in excel_files:
        print(f"\nProcessing: {os.path.basename(excel_path)}")

        try:
            # Get file metadata
            file_created, file_modified = DatasetTemplate.get_file_metadata(excel_path)
            source_filename = os.path.basename(excel_path)

            # Read Excel file
            xls = pd.ExcelFile(excel_path)
            sheet_names = xls.sheet_names
            total_sheets += len(sheet_names)

            print(f"  Found {len(sheet_names)} sheets")

            file_events = 0

            for sheet_name in sheet_names:
                try:
                    sheet_info = (
                        source_filename,
                        sheet_name,
                        file_created,
                        file_modified,
                    )

                    # Read first 10 rows to detect headers
                    df_head = pd.read_excel(
                        xls, sheet_name=sheet_name, header=None, nrows=10
                    )
                    header_row_idx = find_header_row(df_head)

                    if header_row_idx != -1:
                        # Process with detected header
                        df = pd.read_excel(
                            xls, sheet_name=sheet_name, header=header_row_idx, dtype=str
                        )
                        sheet_records = process_excel_sheet_with_header(df, sheet_info)
                    else:
                        # Process without header (infer columns)
                        df = pd.read_excel(
                            xls, sheet_name=sheet_name, header=None, dtype=str
                        )
                        sheet_records = process_excel_sheet_without_header(
                            df, sheet_info
                        )

                    all_excel_records.extend(sheet_records)
                    file_events += len(sheet_records)
                    processed_sheets += 1

                    if sheet_records:
                        print(f"    Sheet '{sheet_name}': {len(sheet_records)} events")
                    else:
                        print(f"    Sheet '{sheet_name}': No valid data found")

                except Exception as sheet_error:
                    print(f"    Sheet '{sheet_name}': Error - {sheet_error}")
                    continue

            print(f"  Total events from file: {file_events}")
            processed_files += 1

        except Exception as file_error:
            print(f"  Error reading file: {file_error}")
            skipped_files += 1
            continue

    # Create standardized DataFrame and generate summary
    if all_excel_records:
        excel_df = pd.DataFrame(all_excel_records)
        excel_df = DatasetTemplate.standardize_dataframe(excel_df)

        print(f"\n" + "=" * 50)
        print(f"EXCEL PROCESSING SUMMARY")
        print("=" * 50)
        print(f"Files processed: {processed_files}")
        print(f"Files skipped: {skipped_files}")
        print(f"Sheets processed: {processed_sheets}/{total_sheets}")
        print(f"Total events extracted: {len(excel_df)}")
        print(f"Unique serial numbers: {excel_df['serial_number'].nunique()}")

        # Show event type breakdown
        event_types = excel_df["event_type"].value_counts()
        print(f"\nEvent types found:")
        for event_type, count in event_types.items():
            print(f"  {event_type}: {count}")

        # Find and analyze multi-event serials
        serial_counts = excel_df["serial_number"].value_counts()
        multi_event_serials = serial_counts[serial_counts > 1]

        if len(multi_event_serials) > 0:
            print(f"\nSerial numbers with multiple events: {len(multi_event_serials)}")
            print("Top 5 serial numbers by event count:")
            for serial, count in multi_event_serials.head().items():
                print(f"  {serial}: {count} events")

        # Save results
        DatasetTemplate.save_to_csv(excel_df, "reports/excel_events_standardized.csv")

        if len(multi_event_serials) > 0:
            multi_event_df = excel_df[
                excel_df["serial_number"].isin(multi_event_serials.index)
            ]
            multi_event_df = multi_event_df.sort_values(
                ["serial_number", "event_date", "event_type"]
            )
            DatasetTemplate.save_to_csv(
                multi_event_df, "reports/excel_multi_event_standardized.csv"
            )
            print(f"Saved multi-event records: {len(multi_event_df)} records")

        return excel_df
    else:
        print("\nNo Excel records were extracted.")
        return pd.DataFrame()


# Run Excel processing
if "discovered_files" in globals() and discovered_files:
    excel_results = process_excel_files(discovered_files)

    if not excel_results.empty:
        print(
            f"\n✓ Excel processing complete! {len(excel_results)} standardized records created."
        )
        print("Files saved:")
        print("  - reports/excel_events_standardized.csv (all events)")
        if (
            len(
                excel_results["serial_number"].value_counts()[
                    excel_results["serial_number"].value_counts() > 1
                ]
            )
            > 0
        ):
            print(
                "  - reports/excel_multi_event_standardized.csv (multi-event serials)"
            )
    else:
        print("\n✓ Excel processing complete - no valid records found.")
else:
    print("No discovered_files found. Run file discovery first.")

Found 345 .xlsx files
Found 33 .xls files
PROCESSING EXCEL FILES
Total Excel files to process: 378

Processing: Unassigned Orchid Fireams as of 1300CST 03172025.xlsx
  Found 1 sheets
    Sheet 'Sheet1': 5836 events
  Total events from file: 5836

Processing: 2022 Weapon Inventory Form.xlsx
  Found 3 sheets
    Sheet 'Sheet1': 251 events
    Sheet 'Sheet2': No valid data found
    Sheet 'Sheet3': No valid data found
  Total events from file: 251

Processing: Murray Weapons (2).xlsx
  Found 13 sheets
    Sheet 'Overview Report': No valid data found
    Sheet 'Transfer Weapons': No valid data found
    Sheet 'Chattanooga TN': 42 events
    Sheet 'Knoxville TN': 31 events
    Sheet 'Tri-Cities TN': 31 events
    Sheet 'Nashville TN': 54 events
    Sheet 'Louisville KY': 49 events
    Sheet 'Decatur AL': 29 events
    Sheet 'Atlanta GA': 4 events
    Sheet 'Jackson TN': 24 events
    Sheet 'Memphis TN': 29 events
    Sheet 'Birmingham AL': 13 events
    Sheet 'Branch Addresses': 29 events
 

In [9]:
# Cell 8: Serial Number Character Analysis
# This cell analyzes all CSV and Excel files to find non-alphanumeric characters in serial numbers and their positions

import re
import pandas as pd
from collections import defaultdict


def analyze_serial_number_characters(discovered_files):
    """
    Analyze serial numbers in all CSV and Excel files to identify non-alphanumeric characters and their positions.
    """
    print("ANALYZING SERIAL NUMBERS FOR NON-ALPHANUMERIC CHARACTERS")
    print("=" * 70)

    # Collect all files to analyze
    csv_files = discovered_files.get("CSV Files", [])
    xlsx_files = discovered_files.get("Excel Files (xlsx)", [])
    xls_files = discovered_files.get("Excel Files (xls)", [])

    all_files = csv_files + xlsx_files + xls_files
    print(
        f"Total files to analyze: {len(all_files)} ({len(csv_files)} CSV, {len(xlsx_files)} XLSX, {len(xls_files)} XLS)"
    )

    # Dictionary to store character analysis results
    char_analysis = defaultdict(
        lambda: {"count": 0, "positions": defaultdict(int), "files": set()}
    )
    total_serials_processed = 0
    files_with_issues = set()
    failed_files = []

    # Process CSV files
    for file_path in csv_files:
        try:
            # Use our preprocessing function to handle misaligned rows
            df_result = preprocess_csv_for_alignment(file_path)

            # Check if preprocessing returned a DataFrame or an error message
            if isinstance(df_result, str):
                # Preprocessing returned an error message
                print(f"Skipping CSV file {file_path}: {df_result}")
                failed_files.append(file_path)
                continue
            elif not isinstance(df_result, pd.DataFrame):
                # Preprocessing returned something unexpected
                print(
                    f"Skipping CSV file {file_path}: Unexpected return type {type(df_result)}"
                )
                failed_files.append(file_path)
                continue
            elif df_result.empty:
                # Preprocessing returned an empty DataFrame
                print(
                    f"Skipping CSV file {file_path}: Empty DataFrame after preprocessing"
                )
                failed_files.append(file_path)
                continue

            df = df_result

            if "Serial Number" in df.columns:
                serials = df["Serial Number"].dropna().astype(str)
                analyze_serials_in_series(
                    serials, file_path, char_analysis, files_with_issues
                )
                total_serials_processed += len(serials)
            else:
                # Try to find alternative serial number column names
                serial_col = find_serial_column(df)
                if serial_col:
                    serials = df[serial_col].dropna().astype(str)
                    analyze_serials_in_series(
                        serials, file_path, char_analysis, files_with_issues
                    )
                    total_serials_processed += len(serials)
                else:
                    print(
                        f"Skipping CSV file {file_path}: No serial number column found"
                    )
        except Exception as e:
            print(f"Error processing CSV file {file_path}: {str(e)}")
            failed_files.append(file_path)

    # Process Excel files
    for file_path in xlsx_files + xls_files:
        try:
            # Determine engine based on file extension
            engine = "openpyxl" if file_path.endswith(".xlsx") else "xlrd"

            # Load Excel file
            xls = pd.ExcelFile(file_path, engine=engine)

            # Process each sheet
            for sheet_name in xls.sheet_names:
                try:
                    df = pd.read_excel(xls, sheet_name=sheet_name, dtype=str)

                    if df.empty:
                        print(
                            f"Skipping sheet {sheet_name} in {file_path}: Empty sheet"
                        )
                        continue

                    if "Serial Number" in df.columns:
                        serials = df["Serial Number"].dropna().astype(str)
                        analyze_serials_in_series(
                            serials,
                            f"{file_path} [{sheet_name}]",
                            char_analysis,
                            files_with_issues,
                        )
                        total_serials_processed += len(serials)
                    else:
                        # Try to find alternative serial number column names
                        serial_col = find_serial_column(df)
                        if serial_col:
                            serials = df[serial_col].dropna().astype(str)
                            analyze_serials_in_series(
                                serials,
                                f"{file_path} [{sheet_name}]",
                                char_analysis,
                                files_with_issues,
                            )
                            total_serials_processed += len(serials)
                        else:
                            print(
                                f"No serial number column found in sheet {sheet_name} of {file_path}"
                            )
                except Exception as e:
                    print(
                        f"Error processing sheet {sheet_name} in {file_path}: {str(e)}"
                    )
        except Exception as e:
            print(f"Error processing Excel file {file_path}: {str(e)}")
            failed_files.append(file_path)

    # Report results
    print(f"\nANALYSIS COMPLETE")
    print("=" * 70)
    print(f"Total serial numbers processed: {total_serials_processed}")
    print(
        f"Files with non-alphanumeric characters in serial numbers: {len(files_with_issues)}"
    )
    print(f"Files that failed to process: {len(failed_files)}")

    if failed_files:
        print(f"\nFAILED FILES:")
        print("-" * 30)
        for file in failed_files[:10]:  # Show first 10
            print(f"  {file}")
        if len(failed_files) > 10:
            print(f"  ... and {len(failed_files) - 10} more")

    if char_analysis:
        print(f"\nNON-ALPHANUMERIC CHARACTERS FOUND:")
        print("-" * 40)

        # Sort by frequency
        sorted_chars = sorted(
            char_analysis.items(), key=lambda x: x[1]["count"], reverse=True
        )

        for char, data in sorted_chars:
            print(f"Character '{char}' (ASCII: {ord(char)})")
            print(f"  Total occurrences: {data['count']}")
            print(f"  Found in files: {len(data['files'])}")

            # Show position distribution
            sorted_positions = sorted(data["positions"].items())
            position_info = ", ".join(
                [f"pos {pos}: {count}" for pos, count in sorted_positions[:10]]
            )
            if len(sorted_positions) > 10:
                position_info += f", ... ({len(sorted_positions) - 10} more positions)"
            print(f"  Position distribution: {position_info}")
            print()

        # Show top 10 files with most issues
        file_issue_counts = defaultdict(int)
        for char_data in char_analysis.values():
            for file in char_data["files"]:
                file_issue_counts[file] += 1

        if file_issue_counts:
            print("TOP FILES WITH MOST NON-ALPHANUMERIC CHARACTERS IN SERIAL NUMBERS:")
            print("-" * 60)
            sorted_files = sorted(
                file_issue_counts.items(), key=lambda x: x[1], reverse=True
            )
            for file, count in sorted_files[:10]:
                print(f"  {file}: {count} different characters")
    else:
        print("\nNo non-alphanumeric characters found in serial numbers!")

    return char_analysis


def analyze_serials_in_series(
    serials, file_identifier, char_analysis, files_with_issues
):
    """
    Analyze a series of serial numbers for non-alphanumeric characters.
    """
    for serial in serials:
        # Find all non-alphanumeric characters and their positions
        for i, char in enumerate(serial):
            if not char.isalnum():
                char_analysis[char]["count"] += 1
                char_analysis[char]["positions"][i] += 1
                char_analysis[char]["files"].add(file_identifier)
                files_with_issues.add(file_identifier)


def find_serial_column(df):
    """
    Try to find a column that might contain serial numbers based on naming patterns.
    """
    possible_names = [
        "serial",
        "sn",
        "s/n",
        "serial#",
        "serial number",
        "serial_no",
        "serial_num",
        "firearm serial",
        "gun serial",
    ]

    for col in df.columns:
        col_lower = str(col).lower()
        for name in possible_names:
            if name in col_lower:
                return col
    return None


# Run the analysis if we have discovered files
if "discovered_files" in globals() and discovered_files:
    serial_char_analysis = analyze_serial_number_characters(discovered_files)
else:
    print("No discovered_files found. Run file discovery first.")

ANALYZING SERIAL NUMBERS FOR NON-ALPHANUMERIC CHARACTERS
Total files to analyze: 391 (13 CSV, 345 XLSX, 33 XLS)
Skipping CSV file data/AUS Weapons List.csv: /var/folders/94/jlyp8h5n6tg_81hstwdbmqqh0000gn/T/tmps1t6eefz.csv
Skipping CSV file temp_attachments/20250801_071752_173946_regionInventoryReport (36).csv: /var/folders/94/jlyp8h5n6tg_81hstwdbmqqh0000gn/T/tmptqh967hn.csv
Skipping CSV file temp_attachments/20250801_071753_217501_AUS Weapons List.csv: /var/folders/94/jlyp8h5n6tg_81hstwdbmqqh0000gn/T/tmp6rf667ni.csv
Skipping CSV file temp_attachments/20250801_071753_391257_regionInventoryReport (7).csv: /var/folders/94/jlyp8h5n6tg_81hstwdbmqqh0000gn/T/tmpz693caow.csv
Skipping CSV file temp_attachments/20250801_071754_795836_Bin Report.csv: /var/folders/94/jlyp8h5n6tg_81hstwdbmqqh0000gn/T/tmpnmtgo3e3.csv
Skipping CSV file temp_attachments/20250801_071756_477557_regionInventoryReport (2).csv: /var/folders/94/jlyp8h5n6tg_81hstwdbmqqh0000gn/T/tmpxuqv22tu.csv
Skipping CSV file temp_attachme

In [17]:
# Cell 9: Serial Numbers with Non-AlphaNumeric Characters

def collect_actual_serial_examples(discovered_files, char_analysis, max_examples_per_char=1):
    """
    Collect actual serial number examples for each character from a sample of files.
    """
    from collections import defaultdict
    import random
    
    print("COLLECTING ACTUAL SERIAL NUMBER EXAMPLES")
    print("=" * 50)
    
    if not char_analysis:
        print("No character analysis data provided.")
        return {}
    
    # Get characters from actual analysis results, sorted by frequency
    sorted_chars = sorted(
        char_analysis.items(), key=lambda x: x[1]["count"], reverse=True
    )
    
    # Limit to top characters
    top_chars = sorted_chars[:15]
    
    # Dictionary to store actual examples
    actual_examples = {}
    
    # Collect a small sample of files to find examples
    csv_files = discovered_files.get("CSV Files", [])
    xlsx_files = discovered_files.get("Excel Files (xlsx)", [])
    xls_files = discovered_files.get("Excel Files (xls)", [])
    
    # Take a small sample for quick processing
    sample_files = []
    if csv_files:
        sample_files.extend(random.sample(csv_files, min(5, len(csv_files))))
    if xlsx_files:
        sample_files.extend(random.sample(xlsx_files, min(5, len(xlsx_files))))
    if xls_files:
        sample_files.extend(random.sample(xls_files, min(3, len(xls_files))))
    
    print(f"Searching for examples in {len(sample_files)} sampled files...")
    
    # Process sampled files to find actual examples
    for file_path in sample_files:
        if len([ex for ex in actual_examples.values() if ex]) >= len(top_chars) * 0.7:  # Stop if we have most examples
            break
            
        try:
            if file_path.endswith('.csv'):
                # Process CSV file
                df_result = preprocess_csv_for_alignment(file_path)
                if isinstance(df_result, pd.DataFrame) and not df_result.empty:
                    df = df_result
                    serial_col = "Serial Number" if "Serial Number" in df.columns else find_serial_column(df)
                    
                    if serial_col and serial_col in df.columns:
                        serials = df[serial_col].dropna().astype(str)
                        for serial in serials:
                            for char, _ in top_chars:
                                if char not in actual_examples and char in serial:
                                    actual_examples[char] = serial
                                    break
            else:
                # Process Excel file
                engine = "openpyxl" if file_path.endswith(".xlsx") else "xlrd"
                xls = pd.ExcelFile(file_path, engine=engine)
                
                for sheet_name in xls.sheet_names[:1]:  # First sheet only
                    try:
                        df = pd.read_excel(xls, sheet_name=sheet_name, dtype=str)
                        if not df.empty:
                            serial_col = "Serial Number" if "Serial Number" in df.columns else find_serial_column(df)
                            
                            if serial_col and serial_col in df.columns:
                                serials = df[serial_col].dropna().astype(str)
                                for serial in serials:
                                    for char, _ in top_chars:
                                        if char not in actual_examples and char in serial:
                                            actual_examples[char] = serial
                                            break
                    except Exception:
                        continue
        except Exception:
            continue
    
    return actual_examples

def show_character_summary_with_actual_examples(discovered_files, char_analysis):
    """
    Show character summary with actual serial number examples on the same line.
    """
    print("SERIAL NUMBER CHARACTER ANALYSIS SUMMARY")
    print("=" * 60)
    
    if not char_analysis:
        print("No character analysis data provided.")
        return
    
    # Get characters from actual analysis results, sorted by frequency
    sorted_chars = sorted(
        char_analysis.items(), key=lambda x: x[1]["count"], reverse=True
    )
    
    # Get actual examples
    actual_examples = collect_actual_serial_examples(discovered_files, char_analysis)
    
    # Display characters with actual examples when available
    for char, data in sorted_chars[:15]:  # Top 15 characters
        char_display = repr(char) if char not in [' ', '\n', '\t'] else f"'{char}'"
        
        # Use actual example if found, otherwise use placeholder
        if char in actual_examples:
            example_display = actual_examples[char].replace('\n', '\\n').replace('\t', '\\t').replace('\r', '\\r')
            print(f"Character {char_display} (ASCII: {ord(char)}): '{example_display}'")
        else:
            # Try to get any example from the files list
            example_file = list(data['files'])[:1]
            if example_file:
                filename = example_file[0].split('/')[-1] if '[' not in example_file[0] else example_file[0].split(' [')[0].split('/')[-1]
                print(f"Character {char_display} (ASCII: {ord(char)}): example from {filename}")
            else:
                print(f"Character {char_display} (ASCII: {ord(char)}): {data['count']} occurrences in {len(data['files'])} files")
        
        # Additional details on separate lines
        print(f"  Total occurrences: {data['count']}")
        print(f"  Found in {len(data['files'])} different files")
        
        # Show some example source files
        example_files = list(data['files'])[:3]
        print(f"  Example files:")
        for file in example_files:
            print(f"    - {file}")
        if len(data['files']) > 3:
            print(f"    ... and {len(data['files']) - 3} more files")
        print()

# To execute this function:
show_character_summary_with_actual_examples(discovered_files, serial_char_analysis)

SERIAL NUMBER CHARACTER ANALYSIS SUMMARY
COLLECTING ACTUAL SERIAL NUMBER EXAMPLES
Searching for examples in 13 sampled files...
Character ' ' (ASCII: 32): 'MF792.40 S&W2'
  Total occurrences: 724277
  Found in 609 different files
  Example files:
    - temp_attachments/20250801_071753_641734_regionInventoryReport (28)1200 031025.xlsx [Little Rock]
    - temp_attachments/20250801_071801_573614_P3 list for AU - company owned.xls [Company owned]
    - temp_attachments/20250801_071802_267948_regionInventoryReport (28)1200 031025.xlsx [Little Rock]
    ... and 606 more files

Character '-' (ASCII: 45): '310-23899'
  Total occurrences: 19086
  Found in 359 different files
  Example files:
    - temp_attachments/20250801_071801_573614_P3 list for AU - company owned.xls [Company owned]
    - temp_attachments/20250801_071754_683926_NE Region Inv Report 7_19_23.xlsx [regionInventoryReport (1)]
    - temp_attachments/20250801_071801_374510_Tiger Team Research Tool v3.xlsx [OLD Tiger Team List v2]