<h3>Importing libraries</h3>

In [None]:
%pip install nltk transformers wordcloud matplotlib pandas dataclasses tqdm

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.util import ngrams
from collections import Counter
from transformers import pipeline, AutoModelForSequenceClassification, DistilBertTokenizer
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import pandas as pd
import logging
import json
import datetime
from tqdm.notebook import tqdm
from pathlib import Path
from dataclasses import dataclass

<h3>Document Analyzer</h3>

In [None]:
# Constants
file_path = Path("file-path")
SUMMARIZER_MODEL = "facebook/bart-large-cnn"
CHUNKER_MODEL = "BlueOrangeDigital/distilbert-cross-segment-document-chunking"
BART_MAX_LENGTH = 512  # Maximum input length for BART model
BART_CONFIG = {
    "max_length": 70,
    "min_length": 30,
    "length_penalty": 2.0,
    "num_beams": 4,
    "early_stopping": True,
    "no_repeat_ngram_size": 3,
}


class DocumentAnalyzer:
    def __init__(self):
        """
        Initialize the DocumentAnalyzer with necessary resources and with error handling.

        This method sets up:
         - NLTK resources ('punkt' for tokenization, 'stopwords', 'averaged_perceptron_tagger' for tagging grammatical role)
         - Summarization pipeline using BART

        Raises:
         - LookupError, if NLTK resources cannot be downloaded
         - ImportError, if required libraries are not installed
         - Exception, for other initialization errors
        """
        self.logger = logging.getLogger(f"{__name__}.DocumentAnalyzer")

        try:
            self.logger.info("Initializing DocumentAnalyzer...")

            # Initialize DistilBERT model
            self.logger.info("Setting up DistilBERT model for document segmentation...")
            self.chunker_model = AutoModelForSequenceClassification.from_pretrained(
                CHUNKER_MODEL,
                num_labels=2,
                id2label={0: "SAME", 1: "DIFFERENT"},
                label2id={"SAME": 0, "DIFFERENT": 1},
            )
            self.chunker_tokenizer = DistilBertTokenizer.from_pretrained(CHUNKER_MODEL)
            self.chunking_pipeline = pipeline(
                "text-classification",
                model=self.chunker_model,
                tokenizer=self.chunker_tokenizer,
                top_k=None,
            )

            # Download NLTK resources
            self.logger.info("Downloading NLTK resources...")
            for resource in ["punkt", "stopwords", "averaged_perceptron_tagger"]:
                try:
                    nltk.download(resource, quiet=True)
                except Exception as e:
                    raise LookupError(
                        f"Failed to download NLTK resource '{resource}': {str(e)}"
                    )

            # Initialize stopwords
            self.logger.info("Setting up stopwords...")
            self.stop_words = set(stopwords.words("english"))

            # Initialize BART summarizer with optimal config
            self.logger.debug("Setting up BART summarization pipeline...")
            self.summarizer = pipeline(
                task="summarization",
                model=SUMMARIZER_MODEL,
                framework="pt",
                device=-1,
                **BART_CONFIG,
            )
            self.logger.debug("Summarizer pipeline initialized successfully")

            self.logger.info("DocumentAnalyzer initialization completed successfully!")

        except Exception as e:
            self.logger.error(f"Error during DocumentAnalyzer initialization: {str(e)}")
            raise

    def load_and_sample_data(self, file_path, sample_size=500):
        """
        Load CSV file and create a balanced sample across all categories

        Args:
            file_path (str): Path to the CSV file
            sample_size (int): Total desired sample size (with a default of 500)

        Returns:
            pd.DataFrame: Balanced sample of articles
        """
        try:
            # Read the CSV file
            df = pd.read_csv(
                file_path,
                sep="\t",  # Tab as delimiter
                engine="python",  # More flexible parsing
                on_bad_lines="warn",  # Don't fail on problematic lines
                encoding="utf-8",  # Explicitly set encoding
            )

            # Ensure required columns exist
            required_columns = ["category", "content"]
            if not all(col in df.columns for col in required_columns):
                raise ValueError(f"CSV must contain these columns: {required_columns}")

            # Clean the data by removing rows with missing values in the 'category' and 'content' columns
            df = df.dropna(subset=["category", "content"])

            # Get unique categories
            categories = df["category"].unique()
            samples_per_category = sample_size // len(categories)

            # Create balanced sample
            sampled_df = pd.DataFrame()
            for category in categories:
                category_df = df[df["category"] == category]
                category_sample = category_df.sample(
                    n=min(samples_per_category, len(category_df)),
                    random_state=42,  # For reproducibility
                )
                sampled_df = pd.concat([sampled_df, category_sample])

            # In case we need more samples to reach the target size
            remaining_samples = sample_size - len(sampled_df)
            if remaining_samples > 0:
                # Sample randomly from all categories to make up the difference
                remaining_df = df[~df.index.isin(sampled_df.index)]
                additional_samples = remaining_df.sample(
                    n=min(remaining_samples, len(remaining_df)), random_state=42
                )
                sampled_df = pd.concat([sampled_df, additional_samples])

            # Shuffle the final dataset
            sampled_df = sampled_df.sample(frac=1, random_state=42).reset_index(
                drop=True
            )

            print(f"\nCreated balanced sample with {len(sampled_df)} articles")
            print("\nCategory distribution:")
            print(sampled_df["category"].value_counts())

            return sampled_df

        except Exception as e:
            print(f"Error loading and sampling data: {str(e)}")
            print("\nDataFrame info before error:")
            try:
                print(df.info())
            except Exception as e:
                print("Could not print DataFrame info")
            return None

    def process_dataframe(self, df, text_column):
        """Process text data from a pandas DataFrame column

        Args:
            df (pd.DataFrame): Input DataFrame
            text_column (str): Name of the column containing text data

        Returns:
            pd.DataFrame: DataFrame with added analysis columns
        """
        try:
            # Create a copy of the dataframe for processing
            processed_df = df.copy()

            # Create new columns for analysis results
            processed_df["num_sentences"] = processed_df[text_column].apply(
                lambda x: len(self.basic_stats(x)["sentences"])
            )
            processed_df["num_words"] = processed_df[text_column].apply(
                lambda x: self.basic_stats(x)["num_words"]
            )
            processed_df["avg_word_length"] = processed_df[text_column].apply(
                lambda x: self.basic_stats(x)["avg_word_length"]
            )
            processed_df["avg_sentence_length"] = processed_df[text_column].apply(
                lambda x: self.basic_stats(x)["avg_sentence_length"]
            )
            processed_df["common_words"] = processed_df[text_column].apply(
                lambda x: self.get_common_words(x)
            )
            processed_df["common_phrases"] = processed_df[text_column].apply(
                lambda x: self.get_common_phrases(x)
            )

            return processed_df

        except Exception as e:
            print(f"Error processing DataFrame: {str(e)}")
            return None

    def basic_stats(self, text):
        """Calculate basic text statistics"""
        try:
            # Split into sentences
            sentences = sent_tokenize(text)

            # Split into words and clean
            words = word_tokenize(text.lower())
            words = [word for word in words if word.isalnum()]  # Remove punctuation

            # Calculate statistics
            num_sentences = len(sentences)
            num_words = len(words)
            avg_word_length = (
                sum(len(word) for word in words) / num_words if num_words > 0 else 0
            )
            avg_sentence_length = num_words / num_sentences if num_sentences > 0 else 0

            return {
                "sentences": sentences,
                "num_sentences": num_sentences,
                "num_words": num_words,
                "avg_word_length": round(avg_word_length, 2),
                "avg_sentence_length": round(avg_sentence_length, 2),
            }

        except Exception as e:
            print(f"Error calculating basic stats: {str(e)}")
            return None

    def get_common_words(self, text, n=10):
        """Find the n most common words in the text"""
        try:
            # Tokenize and clean text
            words = word_tokenize(text.lower())
            words = [
                word for word in words if word.isalnum() and word not in self.stop_words
            ]

            # Count frequencies
            word_freq = Counter(words)

            # Get top n words
            common_words = word_freq.most_common(n)
            return [(word, freq) for word, freq in common_words]

        except Exception as e:
            print(f"Error finding common words: {str(e)}")
            return []

    def get_common_phrases(self, text, n=5, phrase_length=2):
        """Find the n most common phrases of specified length"""
        try:
            # Tokenize and clean text
            words = word_tokenize(text.lower())
            words = [
                word for word in words if word.isalnum() and word not in self.stop_words
            ]

            # Generate n-grams
            phrases = list(ngrams(words, phrase_length))

            # Count frequencies
            phrase_freq = Counter(phrases)

            # Get top n phrases
            common_phrases = phrase_freq.most_common(n)
            return [(" ".join(phrase), freq) for phrase, freq in common_phrases]

        except Exception as e:
            print(f"Error finding common phrases: {str(e)}")
            return []

    def create_wordcloud(self, text, save_path=None):
        """Generate and optionally save a word cloud visualization

        Args:
            text (str): The text to create word cloud from
            save_path (str or Path, optional): If provided, save the wordcloud to this path
        """
        try:
            # Create WordCloud object
            wordcloud = WordCloud(
                width=800,
                height=400,
                background_color="white",
                stopwords=self.stop_words,
                min_font_size=10,
            ).generate(text)

            # Create the plot
            plt.figure(figsize=(10, 5))
            plt.imshow(wordcloud, interpolation="bilinear")
            plt.axis("off")

            # Save or display
            if save_path:
                plt.savefig(save_path)  # Find the figures in the 'results' dictionary
                plt.close()  # Close the figure to free memory
            else:
                plt.show()

        except Exception as e:
            print(f"Error creating word cloud: {str(e)}")

    def generate_summary(
        self,
        text,
        max_length=BART_CONFIG["max_length"],
        min_length=BART_CONFIG["min_length"],
        output_path=None,
        document_id=None,
        metadata=None,
    ):
        """Generate a summary using an adaptive single/two-pass approach based on text length."""
        try:
            self.logger.info(f"Generating summary for document {document_id}")

            # Count input tokens
            input_tokens = len(self.summarizer.tokenizer(text)["input_ids"])
            self.logger.debug(f"Input tokens: {input_tokens}")

            # Initialize metadata
            summary_metadata = {
                "document_id": document_id,
                "timestamp": datetime.datetime.now().isoformat(),
                "input_tokens": input_tokens,
                "single_pass": True,
            }

            # For long texts, use chunking approach
            if input_tokens > BART_MAX_LENGTH:
                self.logger.info(
                    f"Long text detected ({input_tokens} tokens). Using chunking approach."
                )
                chunks, chunk_stats = self._split_into_chunks(text)
                chunk_summaries = []

                for i, chunk in enumerate(chunks, 1):
                    self.logger.debug(f"Processing chunk {i}/{len(chunks)}")
                    chunk_summary = self.summarizer(
                        chunk,
                        max_length=max_length,
                        min_length=min_length,
                        truncation=True,
                    )[0]["summary_text"]
                    chunk_summaries.append(chunk_summary)

                # Combine chunk summaries and generate final summary
                self.logger.debug("Generating final summary from chunks")
                intermediate_summary = " ".join(chunk_summaries)
                final_summary = self.summarizer(
                    intermediate_summary,
                    max_length=max_length,
                    min_length=min_length,
                    truncation=True,
                )[0]["summary_text"]

                summary_metadata.update(
                    {
                        "single_pass": False,
                        "num_chunks": len(chunks),
                        "chunk_stats": chunk_stats,
                    }
                )

                summary_text = final_summary
            else:
                self.logger.debug("Using single-pass approach")
                summary_text = self.summarizer(
                    text, max_length=max_length, min_length=min_length, truncation=True
                )[0]["summary_text"]

            # Count output tokens
            output_tokens = len(self.summarizer.tokenizer(summary_text)["input_ids"])
            summary_metadata["output_tokens"] = output_tokens

            # Add source CSV metadata if provided
            if metadata:
                summary_metadata["source"] = {
                    "category": metadata.get("category"),
                    "filename": metadata.get("filename"),
                    "title": metadata.get("title"),
                    "index": metadata.get("index"),
                }

            summary_result = {"summary": summary_text, "metadata": summary_metadata}

            self.logger.info(
                f"Successfully generated summary for document {document_id}"
            )
            return summary_result

        except Exception as e:
            self.logger.error(
                f"Error in generate_summary for document {document_id}: {str(e)}"
            )
            return {"error": str(e), "summary": ""}

    def _split_into_chunks(self, text, max_chunk_size=1024):
        """Split text into chunks using DistilBERT-based segmentation with token-aware sizing"""
        try:
            # Initialize statistics
            stats = {
                "total_words": len(text.split()),
                "breakpoints": [],
                "breakpoint_scores": [],
                "chunk_sizes": [],
                "chunk_sentence_counts": [],
                "processing_start_time": datetime.datetime.now().isoformat(),
            }

            # Input validation
            if not text or not isinstance(text, str):
                self.logger.warning("Empty or invalid input text provided")
                return [], {"error": "Empty or invalid input"}

            # Split into sentences
            sentences = sent_tokenize(text)
            if not sentences:
                self.logger.warning("No sentences found in input text")
                return [text[:max_chunk_size]], {
                    "error": "No sentences found",
                    "stats": stats,
                }

            stats["total_sentences"] = len(sentences)
            self.logger.debug(f"Processing {len(sentences)} sentences")

            # Create sentence pairs for classification
            sent_pairs = [
                sentences[i] + " [SEP] " + sentences[i + 1]
                for i in range(len(sentences) - 1)
            ]

            if not sent_pairs:
                return [text[:max_chunk_size]], {
                    "error": "No sentence pairs for classification"
                }

            # Process sentence pairs with chunking pipeline
            try:
                attributions = []
                for pair in tqdm(
                    sent_pairs,
                    desc="Classifying sentence pairs",
                    disable=len(sent_pairs) < 10,
                ):
                    # Get raw pipeline output
                    result = self.chunking_pipeline(pair)
                    self.logger.debug(f"Pipeline output: {result}")

                    # Handle the pipeline output based on its structure
                    if (
                        isinstance(result, list)
                        and len(result) > 0
                        and isinstance(result[0], list)
                    ):
                        # Get the inner list of predictions
                        predictions = result[0]
                        # Convert pipeline output to score dictionary
                        scores = {}
                        for pred in predictions:
                            if (
                                isinstance(pred, dict)
                                and "label" in pred
                                and "score" in pred
                            ):
                                scores[pred["label"]] = pred["score"]

                        # Ensure we have both scores
                        diff_score = scores.get("DIFFERENT", 0.0)
                        same_score = scores.get(
                            "SAME", 1.0 - diff_score
                        )  # If one is missing, assume complementary

                        attributions.append(
                            {"DIFFERENT": diff_score, "SAME": same_score}
                        )
                    else:
                        self.logger.warning(
                            f"Unexpected pipeline output format: {type(result)}"
                        )
                        # Add neutral scores if we can't interpret the output
                        attributions.append({"DIFFERENT": 0.5, "SAME": 0.5})

            except Exception as e:
                self.logger.error(f"Pipeline processing failed: {str(e)}")
                return [text[:max_chunk_size]], {
                    "error": f"Pipeline processing failed: {str(e)}"
                }

            # Find breakpoints with token-aware sizing
            breakpoints = []
            current_chunk_tokens = 0
            min_chunk_tokens = int(BART_MAX_LENGTH * 0.2)  # Minimum 20% of max length

            # Pre-calculate token counts for all sentences
            sentence_tokens = [
                len(self.summarizer.tokenizer(sent)["input_ids"]) for sent in sentences
            ]

            # Find semantic breakpoints
            for i, scores in enumerate(attributions):
                try:
                    # Get the DIFFERENT score directly from the attribution
                    diff_score = scores.get("DIFFERENT", 0.0)

                    # Add tokens for current sentence
                    current_chunk_tokens += sentence_tokens[i]

                    self.logger.debug(
                        f"Position {i + 1}: tokens={current_chunk_tokens}, "
                        f"min_required={min_chunk_tokens}, diff_score={diff_score:.3f}"
                    )

                    # Create break if we have enough tokens and strong semantic difference
                    if current_chunk_tokens >= min_chunk_tokens and diff_score > 0.6:
                        breakpoints.append(i + 1)
                        stats["breakpoint_scores"].append(
                            {
                                "position": i + 1,
                                "diff_score": diff_score,
                                "chunk_tokens": current_chunk_tokens,
                            }
                        )
                        self.logger.debug(
                            f"Added breakpoint at {i + 1}: tokens={current_chunk_tokens}, diff_score={diff_score:.3f}"
                        )
                        # Reset token count for next chunk
                        current_chunk_tokens = 0
                        # Start counting from next sentence
                        continue

                except Exception as e:
                    self.logger.debug(f"Skipping classification at index {i}: {str(e)}")
                    continue

            # If no breaks found and text is too long, force a break at best semantic point
            if not breakpoints and current_chunk_tokens > max_chunk_size:
                # Find best breakpoint based on semantic scores
                best_break = max(
                    range(len(attributions)), key=lambda i: attributions[i]["DIFFERENT"]
                )
                if best_break > 0:
                    breakpoints.append(best_break + 1)
                    stats["breakpoint_scores"].append(
                        {
                            "position": best_break + 1,
                            "diff_score": attributions[best_break]["DIFFERENT"],
                            "chunk_tokens": sum(sentence_tokens[: best_break + 1]),
                        }
                    )
                    self.logger.debug(f"Forced break at {best_break + 1} due to length")

            # Create chunks based on breakpoints
            chunks = []
            current_chunk = []
            current_size = 0
            current_sent_count = 0

            # Add debug logging for sentence lengths
            self.logger.debug("Sentence token counts:")
            for i, tokens in enumerate(sentence_tokens):
                self.logger.debug(f"Sentence {i + 1}: {tokens} tokens")

            for i, sentence in enumerate(sentences):
                sentence_tokens_count = sentence_tokens[i]

                # Create new chunk if we hit a breakpoint
                if i in breakpoints:  # Changed from i+1 to i
                    if current_chunk:  # Save current chunk if it exists
                        chunk_text = " ".join(current_chunk)
                        chunks.append(chunk_text)
                        stats["chunk_sizes"].append(current_size)
                        stats["chunk_sentence_counts"].append(current_sent_count)
                        self.logger.debug(
                            f"Created chunk with {current_size} tokens and {current_sent_count} sentences"
                        )

                    # Start new chunk with current sentence
                    current_chunk = [sentence]
                    current_size = sentence_tokens_count
                    current_sent_count = 1
                else:
                    current_chunk.append(sentence)
                    current_size += sentence_tokens_count
                    current_sent_count += 1

            # Add the last chunk if it exists
            if current_chunk:
                chunk_text = " ".join(current_chunk)
                chunks.append(chunk_text)
                stats["chunk_sizes"].append(current_size)
                stats["chunk_sentence_counts"].append(current_sent_count)
                self.logger.debug(
                    f"Created final chunk with {current_size} tokens and {current_sent_count} sentences"
                )

            # Log breakpoint information for debugging
            self.logger.debug(f"Found breakpoints at positions: {breakpoints}")
            for score in stats["breakpoint_scores"]:
                self.logger.debug(
                    f"Breakpoint at {score['position']}: diff_score={score['diff_score']}, tokens={score['chunk_tokens']}"
                )

            # Calculate final statistics
            if chunks:
                stats.update(
                    {
                        "num_chunks": len(chunks),
                        "avg_chunk_size": sum(stats["chunk_sizes"]) / len(chunks),
                        "max_chunk_size": max(stats["chunk_sizes"]),
                        "min_chunk_size": min(stats["chunk_sizes"]),
                        "avg_sentences_per_chunk": sum(stats["chunk_sentence_counts"])
                        / len(chunks),
                        "processing_end_time": datetime.datetime.now().isoformat(),
                    }
                )

                self.logger.info("Chunk Statistics:")
                self.logger.info(f"Total chunks: {stats['num_chunks']}")
                self.logger.info(
                    f"Average chunk size (tokens): {stats['avg_chunk_size']:.2f}"
                )
                self.logger.info(
                    f"Max/Min chunk size: {stats['max_chunk_size']}/{stats['min_chunk_size']}"
                )
                self.logger.info(f"Breakpoints found: {len(breakpoints)}")
                self.logger.info(f"Breakpoint positions: {breakpoints}")
                self.logger.info(
                    f"Avg sentences per chunk: {stats['avg_sentences_per_chunk']:.2f}"
                )

                self.logger.debug(f"Created {len(chunks)} chunks")
                return chunks, stats

            return [text[:max_chunk_size]], {
                "error": "No chunks created",
                "stats": stats,
            }

        except Exception as e:
            self.logger.error(f"Error in _split_into_chunks: {str(e)}")
            return [text[:max_chunk_size]], {"error": str(e)}

    def save_summaries_to_json(self, summaries, output_path):
        """
        Save summaries and their metadata to a JSON file

        Args:
            summaries (dict): Dictionary containing summaries and their metadata
            output_path (str or Path): Path to save the JSON file

        Returns:
            dict: Status of the save operation
        """
        try:
            # Convert output_path to Path object
            output_path = Path(output_path)

            # Ensure the output directory exists
            output_path.parent.mkdir(parents=True, exist_ok=True)

            # Initialize the full structure if it doesn't exist
            if not isinstance(summaries, dict) or "summaries" not in summaries:
                summaries = {
                    "metadata": {
                        "timestamp": datetime.datetime.now().isoformat(),
                        "version": {
                            "bart_model": SUMMARIZER_MODEL,
                            "chunker_model": CHUNKER_MODEL,
                            "max_length": BART_MAX_LENGTH,
                            "config": BART_CONFIG,
                        },
                    },
                    "summaries": [],
                }

            # Ensure summaries is a list
            if "summaries" in summaries and not isinstance(
                summaries["summaries"], list
            ):
                summaries["summaries"] = [summaries["summaries"]]

            # Write to JSON file with error handling
            try:
                with output_path.open("w", encoding="utf-8") as f:
                    json.dump(summaries, f, indent=4, ensure_ascii=False)
            except Exception as e:
                raise IOError(f"Failed to write JSON file: {str(e)}")

            # Verify file was written
            if not output_path.exists():
                raise IOError("File was not created successfully")

            file_size = output_path.stat().st_size
            self.logger.info(
                f"Summaries successfully saved to {output_path} ({file_size} bytes)"
            )

            return {
                "status": "success",
                "file_path": str(output_path),
                "file_size_bytes": file_size,
                "timestamp": summaries["metadata"]["timestamp"],
            }

        except Exception as e:
            error_msg = f"Error saving summaries to JSON: {str(e)}"
            self.logger.error(error_msg)
            return {
                "status": "error",
                "error": error_msg,
                "timestamp": datetime.datetime.now().isoformat(),
            }

    def save_analysis(self, filepath, analysis_results):
        """Save analysis results to a CSV file"""
        try:
            # Convert complex columns to string representation
            analysis_results["common_words"] = analysis_results["common_words"].apply(
                str
            )
            analysis_results["common_phrases"] = analysis_results[
                "common_phrases"
            ].apply(str)

            # Save to CSV
            analysis_results.to_csv(filepath, index=False)
            self.logger.info(f"Analysis results saved to {filepath}")

        except Exception as e:
            self.logger.error(f"Error saving analysis results: {str(e)}")


<h3>Basic functionality test</h3>

In [None]:
def run_tests():
    """Your existing run_tests code"""
    analyzer = DocumentAnalyzer()
    test_df = pd.DataFrame({
        'content': ['This is a test article. It has two sentences.',
                   'Another test article. With multiple sentences. Testing.'],
        'category': ['test', 'test']
    })
    results = analyzer.process_dataframe(test_df, 'content')
    assert not results.empty, "Processing should return non-empty DataFrame"
    assert 'num_sentences' in results.columns, "Results should include sentence count"
    assert 'num_words' in results.columns, "Results should include word count"
    print("All tests passed!")

run_tests()

<h3>Analysis interface</h3>

In [None]:

@dataclass
class AnalysisParameters:
    """Class for holding document analysis parameters"""

    input_file: str
    sample_size: int = 500
    text_column: str = "content"
    wordclouds: int = 5
    save_wordclouds: bool = False
    output_file: str = "analysis_results.csv"
    output_dir: str = "results"
    run_tests: bool = False
    debug: bool = False
    summarize: bool = False
    max_summary_length: int = 70
    min_summary_length: int = 30


class DocumentAnalysisCLI:
    def __init__(self):
        """Initialize CLI with configured logging."""
        self.logger = self._setup_logging()
        self.analyzer = None

    def _setup_logging(self):
        """Configure logging with formatted output."""
        # Create formatter
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )

        # Create console handler
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)

        # Get logger
        logger = logging.getLogger(f"{__name__}.DocumentAnalysisCLI")
        logger.addHandler(console_handler)

        return logger

    def _configure_logging_level(self, debug: bool):
        """Configure logging level based on debug parameter."""
        root_logger = logging.getLogger()

        # Remove existing handlers
        for handler in root_logger.handlers[:]:
            root_logger.removeHandler(handler)

        # Create new handler with formatter
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )
        handler.setFormatter(formatter)
        root_logger.addHandler(handler)

        # Set logging level
        level = logging.DEBUG if debug else logging.INFO
        root_logger.setLevel(level)
        self.logger.setLevel(level)

        self.logger.debug(
            "Logging level set to DEBUG" if debug else "Logging level set to INFO"
        )

    def run(self, params: AnalysisParameters):
        """Main execution method."""
        try:
            # Configure logging level
            self._configure_logging_level(params.debug)

            # Initialize the document analyzer
            self.logger.info("Initializing Document Analyzer...")
            self.analyzer = DocumentAnalyzer()

            # Add debug logging to verify methods
            self.logger.debug(f"Available analyzer methods: {dir(self.analyzer)}")
            self.logger.debug(f"Analyzer class: {self.analyzer.__class__}")

            # Load and sample data
            self.logger.info(f"Loading data from {params.input_file}")
            sampled_df = self.analyzer.load_and_sample_data(
                params.input_file, sample_size=params.sample_size
            )

            if sampled_df is None:
                self.logger.error("Failed to load and sample data")
                return None

            # Process the DataFrame
            self.logger.info("Processing articles...")
            results = self.analyzer.process_dataframe(sampled_df, params.text_column)

            if results is None:
                self.logger.error("Failed to process articles")
                return None

            # Generate word clouds if requested
            if params.wordclouds > 0:
                self._generate_wordclouds(params, sampled_df)

            # Generate and save summaries if requested
            if params.summarize:
                self.logger.info("Starting summary generation...")
                summaries_data = []

                # Configure progress bar with simpler formatting
                with tqdm(
                    total=len(results),
                    desc="Generating summaries",
                    unit="doc",
                    leave=True,
                ) as progress_bar:
                    for idx, row in results.iterrows():
                        try:
                            # Create metadata dictionary from source CSV
                            source_metadata = {
                                "category": row["category"],
                                "filename": row["filename"],
                                "title": row["title"],
                                "index": idx,
                            }

                            summary_result = self.analyzer.generate_summary(
                                text=row[params.text_column],
                                max_length=params.max_summary_length,
                                min_length=params.min_summary_length,
                                document_id=str(idx),
                                metadata=source_metadata,
                            )

                            summaries_data.append(summary_result)

                            # Save summaries to JSON after each summary
                            summaries_output_path = (
                                Path(params.output_dir) / "summaries.json"
                            )
                            save_result = self.analyzer.save_summaries_to_json(
                                {
                                    "metadata": {
                                        "timestamp": datetime.datetime.now().isoformat(),
                                        "total_documents": len(results),
                                        "successful_summaries": len(summaries_data),
                                        "generation_date": datetime.datetime.now().isoformat(),
                                        "parameters": params.__dict__,
                                    },
                                    "summaries": summaries_data,
                                },
                                summaries_output_path,
                            )

                            if save_result["status"] != "success":
                                self.logger.error(
                                    f"Failed to save summary {idx}: {save_result.get('error', 'Unknown error')}"
                                )
                                status = "✗"
                            else:
                                status = "✓" if "error" not in summary_result else "✗"

                            # Update progress bar with basic status
                            progress_bar.set_postfix(
                                {"status": status, "category": row["category"][:10]}
                            )
                            progress_bar.update(1)

                        except Exception as e:
                            self.logger.error(
                                f"Error processing document {idx}: {str(e)}"
                            )
                            progress_bar.set_postfix({"status": "error"})
                            continue

                self.logger.info(
                    f"Summary generation completed. Processed {len(summaries_data)} documents."
                )

            # Save analysis results
            output_path = Path(params.output_dir) / params.output_file
            self.analyzer.save_analysis(output_path, results)
            self.logger.info(f"Analysis results saved to {output_path}")

            # Run tests if requested
            if params.run_tests:
                self.logger.info("Running tests...")
                run_tests()

            return results

        except Exception as e:
            self.logger.error(f"Error during execution: {str(e)}")
            if params.debug:
                import traceback

                self.logger.debug(traceback.format_exc())
            return None

    def _generate_wordclouds(self, params, sampled_df):
        """Helper method to generate word clouds."""
        self.logger.info(f"Generating {params.wordclouds} word clouds...")
        random_articles = sampled_df.sample(
            n=min(params.wordclouds, len(sampled_df)), random_state=42
        )[params.text_column]

        for i, article in enumerate(random_articles, 1):
            self.logger.info(f"Generating word cloud {i}/{params.wordclouds}")
            if params.save_wordclouds:
                output_path = Path(params.output_dir) / f"wordcloud_{i}.png"
                self.analyzer.create_wordcloud(article, save_path=output_path)
            else:
                self.analyzer.create_wordcloud(article)


def run_analysis(input_file: str, output_path: str):
    """Run document analysis with specified parameters."""
    params = AnalysisParameters(
        input_file=input_file,
        sample_size=500,
        text_column="content",
        wordclouds=5,
        save_wordclouds=True,  # Changed to True to save wordclouds
        output_file="analysis_results.csv",
        output_dir="results",
        run_tests=False,
        debug=True,  # Changed to True for better debugging
        summarize=True,  # Changed to True to generate summaries
        max_summary_length=BART_CONFIG["max_length"],
        min_summary_length=BART_CONFIG["min_length"],
    )

    # Create output directory if it doesn't exist
    Path(params.output_dir).mkdir(parents=True, exist_ok=True)

    # Run the analysis
    cli = DocumentAnalysisCLI()
    return cli.run(params)


<h3>Analysis</h3>

In [None]:
# before running this cell make sure to provide the correct file path above in the constants 
results = run_analysis(input_file=str(file_path), output_path="results")

if results is not None:
    print("Analysis completed successfully!")
    
    # Display some information about the results
    print(f"\nAnalyzed {len(results)} articles")
    print("\nColumns in results:")
    print(results.columns.tolist())
    
    # Show a sample of the summaries if they were generated
    if 'summary' in results.columns:
        print("\nSample summary:")
        print(results['summary'].iloc[0])