<a href="https://colab.research.google.com/github/wtrekell/soylent-army/blob/main/colab/ai_vs_human_v1.4c.1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# CELL 1: SETUP AND CONFIGURATION

# Core Python libraries
import difflib
import json
import os
import re
from collections import defaultdict
from datetime import datetime

# Data analysis libraries
import numpy as np

# Machine learning and semantic analysis
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity


class Config:
    """Configuration settings for the analysis."""

    VERSION_PREFIXES = ["draft-", "refined-", "edited-", "final-"]
    VERSION_ORDER = {prefix: i for i, prefix in enumerate(VERSION_PREFIXES)}

    MIN_SENTENCE_LENGTH = 10  # Minimum characters for a sentence
    MAX_SENTENCE_LENGTH = 200  # Maximum characters for a sentence


def setup_output_directories(base_path):
    """Create necessary output directories."""
    output_dir = base_path
    archive_dir = os.path.join(base_path, "archive")

    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(archive_dir, exist_ok=True)

    print(f"‚úì Output directory ready: {output_dir}")
    print(f"‚úì Archive directory ready: {archive_dir}")

    return output_dir, archive_dir


print("üìã Configuration loaded. Ready to process articles.")

üìã Configuration loaded. Ready to process articles.


In [2]:
# CELL 2: DATA INGESTION & VALIDATION (STEP 1)


class ArticleVersions:
    """Class to handle loading and validating article versions."""

    def __init__(self, article_name, input_path):
        self.article_name = article_name
        self.input_path = input_path
        self.versions = {}
        self.metadata = {
            "article_name": article_name,
            "input_path": input_path,
            "processing_timestamp": datetime.now().isoformat(),
            "versions_found": [],
            "validation_status": "pending",
        }

    def load_versions(self):
        """Load all versions of an article from the specified path."""
        print(f"\nüìÅ Loading versions for article: {self.article_name}")
        print(f"üìÇ Input path: {self.input_path}")

        for prefix in Config.VERSION_PREFIXES:
            filename = f"{prefix}{self.article_name}.md"
            filepath = os.path.join(self.input_path, filename)

            if os.path.exists(filepath):
                try:
                    with open(filepath, encoding="utf-8") as file:
                        content = file.read()
                        self.versions[prefix.rstrip("-")] = {
                            "filename": filename,
                            "filepath": filepath,
                            "content": content,
                            "loaded_at": datetime.now().isoformat(),
                            "file_size": len(content),
                        }
                        print(f"  ‚úì Loaded: {filename} ({len(content)} characters)")

                except Exception as e:
                    print(f"  ‚úó Error loading {filename}: {e!s}")
            else:
                print(f"  - Not found: {filename}")

        self.metadata["versions_found"] = list(self.versions.keys())
        return self.versions

    def validate_version_sequence(self):
        """Validate that we have the minimum required versions."""
        found_versions = set(self.versions.keys())
        required_versions = ["draft", "final"]

        missing_required = []
        for version in required_versions:
            if version not in found_versions:
                missing_required.append(version)

        validation_results = {
            "has_draft": "draft" in found_versions,
            "has_final": "final" in found_versions,
            "missing_required": missing_required,
            "versions_found": list(found_versions),
            "is_valid": len(missing_required) == 0,
        }

        self.metadata["validation_results"] = validation_results

        if validation_results["is_valid"]:
            self.metadata["validation_status"] = "passed"
            print(f"‚úì Validation passed: Found {len(found_versions)} versions")
        else:
            self.metadata["validation_status"] = "failed"
            print("‚úó Validation failed: Missing required versions")
            print(f"  Missing: {', '.join(missing_required)}")

        return validation_results

    def get_summary(self):
        """Get a summary of loaded versions."""
        summary = {
            "article_name": self.article_name,
            "input_path": self.input_path,
            "versions_count": len(self.versions),
            "validation_status": self.metadata["validation_status"],
            "file_sizes": {},
        }

        for version, data in self.versions.items():
            summary["file_sizes"][version] = data["file_size"]

        return summary


print("üìñ ArticleVersions class loaded. Ready for data ingestion.")

üìñ ArticleVersions class loaded. Ready for data ingestion.


In [3]:
# CELL 3: TEXT PREPROCESSING (STEP 2)


class TextPreprocessor:
    """Class to handle text preprocessing and segmentation."""

    def __init__(self):
        self.processed_versions = {}

    def clean_markdown(self, text):
        """Clean markdown formatting while preserving content structure."""
        # Remove markdown formatting but keep the text

        cleaned_text = text

        # Apply patterns that need multiline flag
        multiline_patterns = [
            (r"^\s*[\*\-\+]\s+", ""),  # Bullet points
            (r"^\s*\d+\.\s+", ""),  # Numbered lists
        ]

        for pattern, replacement in multiline_patterns:
            cleaned_text = re.sub(
                pattern, replacement, cleaned_text, flags=re.MULTILINE
            )

        # Apply regular patterns
        regular_patterns = [
            (r"^\s*#{1,6}\s+", ""),  # Headers
            (r"\*\*(.*?)\*\*", r"\1"),  # Bold
            (r"\*(.*?)\*", r"\1"),  # Italic
            (r"`(.*?)`", r"\1"),  # Inline code
            (r"```.*?```", ""),  # Code blocks
            (r"!\[.*?\]\(.*?\)", ""),  # Images
            (r"\[([^\]]+)\]\([^\)]+\)", r"\1"),  # Links
            (r"\n{3,}", "\n\n"),  # Multiple newlines
        ]

        for pattern, replacement in regular_patterns:
            cleaned_text = re.sub(pattern, replacement, cleaned_text)

        return cleaned_text.strip()

    def segment_into_sentences(self, text):
        """Segment text into sentences with basic filtering."""
        # Simple sentence segmentation (can be enhanced with spaCy later if needed)
        sentences = re.split(r"[.!?]+\s+", text)

        # Filter sentences
        filtered_sentences = []
        for sentence in sentences:
            sentence = sentence.strip()
            if (
                Config.MIN_SENTENCE_LENGTH
                <= len(sentence)
                <= Config.MAX_SENTENCE_LENGTH
                and sentence
            ):
                filtered_sentences.append(sentence)

        return filtered_sentences

    def segment_into_paragraphs(self, text):
        """Segment text into paragraphs."""
        paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
        return paragraphs

    def process_version(self, version_name, raw_content):
        """Process a single version of the article."""
        print(f"  Processing {version_name} version...")

        # Clean the markdown
        cleaned_content = self.clean_markdown(raw_content)

        # Segment into different units
        sentences = self.segment_into_sentences(cleaned_content)
        paragraphs = self.segment_into_paragraphs(cleaned_content)

        # Calculate basic statistics
        stats = {
            "character_count": len(cleaned_content),
            "word_count": len(cleaned_content.split()),
            "sentence_count": len(sentences),
            "paragraph_count": len(paragraphs),
            "avg_sentence_length": sum(len(s) for s in sentences) / len(sentences)
            if sentences
            else 0,
            "avg_paragraph_length": sum(len(p) for p in paragraphs) / len(paragraphs)
            if paragraphs
            else 0,
        }

        processed_data = {
            "version_name": version_name,
            "raw_content": raw_content,
            "cleaned_content": cleaned_content,
            "sentences": sentences,
            "paragraphs": paragraphs,
            "statistics": stats,
            "processed_at": datetime.now().isoformat(),
        }

        self.processed_versions[version_name] = processed_data

        print(
            f"    ‚úì {stats['sentence_count']} sentences, {stats['paragraph_count']} paragraphs"
        )
        print(
            f"    ‚úì {stats['word_count']} words, {stats['character_count']} characters"
        )

        return processed_data

    def process_all_versions(self, article_versions):
        """Process all versions of an article."""
        print("\nüîÑ Preprocessing text for all versions...")

        for version_name, version_data in article_versions.versions.items():
            self.process_version(version_name, version_data["content"])

        return self.processed_versions

    def get_processing_summary(self):
        """Get a summary of processing results."""
        summary = {}
        for version_name, data in self.processed_versions.items():
            summary[version_name] = data["statistics"]

        return summary


print("üîß TextPreprocessor class loaded. Ready for text processing.")

üîß TextPreprocessor class loaded. Ready for text processing.


In [4]:
# CELL 4: EXECUTION FUNCTIONS AND CHECKPOINT MANAGEMENT


def save_checkpoint_data(
    article_versions, preprocessor, output_path, checkpoint_name="steps_1_2"
):
    """Save checkpoint data for review."""
    checkpoint_data = {
        "checkpoint_name": checkpoint_name,
        "timestamp": datetime.now().isoformat(),
        "article_metadata": article_versions.metadata,
        "processing_summary": preprocessor.get_processing_summary(),
        "validation_results": article_versions.metadata.get("validation_results", {}),
        "article_summary": article_versions.get_summary(),
    }

    checkpoint_file = f"{output_path}/{article_versions.article_name}_checkpoint_{checkpoint_name}.json"

    with open(checkpoint_file, "w", encoding="utf-8") as f:
        json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)

    print(f"\nüíæ Checkpoint saved: {checkpoint_file}")
    return checkpoint_data


def run_steps_1_2(article_name, input_path, base_output_path):
    """Run steps 1-2 for a given article."""
    print(f"üöÄ Starting Steps 1-2 for article: {article_name}")
    print(f"üìÇ Input path: {input_path}")

    output_dir, archive_dir = setup_output_directories(base_output_path)

    # Step 1: Data Ingestion & Validation
    article_versions = ArticleVersions(article_name, input_path)
    article_versions.load_versions()
    validation_results = article_versions.validate_version_sequence()

    if not validation_results["is_valid"]:
        print("‚ùå Cannot proceed: Missing required versions (draft and final)")
        return None, None

    # Step 2: Text Preprocessing
    preprocessor = TextPreprocessor()
    preprocessor.process_all_versions(article_versions)

    # Save checkpoint
    save_checkpoint_data(article_versions, preprocessor, output_dir)

    print("\n‚úÖ Steps 1-2 completed successfully!")
    print("üìä Processing Summary:")
    for version, stats in preprocessor.get_processing_summary().items():
        print(
            f"  {version}: {stats['word_count']} words, {stats['sentence_count']} sentences"
        )

    return article_versions, preprocessor


def get_user_inputs():
    """Get user inputs for processing."""
    print("üìù Please provide the following information:")

    article_name = input("Enter article name (without .md extension): ").strip()
    input_path = input(
        "Enter full path to input folder containing markdown files: "
    ).strip()
    base_output_path = input("Enter full path to base output folder: ").strip()

    print("\nüìã Configuration:")
    print(f"  Article name: {article_name}")
    print(f"  Input path: {input_path}")
    print(f"  Output path: {base_output_path}")

    confirm = input("\nProceed with these settings? (y/n): ").strip().lower()

    if confirm == "y":
        return article_name, input_path, base_output_path
    else:
        print("‚ùå Cancelled. Run get_user_inputs() again to restart.")
        return None, None, None


def process_article_interactive():
    """Process an article with interactive inputs."""
    article_name, input_path, base_output_path = get_user_inputs()

    if article_name and input_path and base_output_path:
        return run_steps_1_2(article_name, input_path, base_output_path)
    else:
        return None, None


print("üìã Ready to process your article!")
print("Run: article_versions, preprocessor = process_article_interactive()")
print("\nMake sure your markdown files are named:")
print("- draft-your-article-name.md")
print("- refined-your-article-name.md")
print("- edited-your-article-name.md")
print("- final-your-article-name.md")

üìã Ready to process your article!
Run: article_versions, preprocessor = process_article_interactive()

Make sure your markdown files are named:
- draft-your-article-name.md
- refined-your-article-name.md
- edited-your-article-name.md
- final-your-article-name.md


In [5]:
article_versions, preprocessor = process_article_interactive()

üìù Please provide the following information:
Enter article name (without .md extension): markup-languages
Enter full path to input folder containing markdown files: /content/
Enter full path to base output folder: /content/output/

üìã Configuration:
  Article name: markup-languages
  Input path: /content/
  Output path: /content/output/

Proceed with these settings? (y/n): y
üöÄ Starting Steps 1-2 for article: markup-languages
üìÇ Input path: /content/
‚úì Output directory ready: /content/output/
‚úì Archive directory ready: /content/output/archive

üìÅ Loading versions for article: markup-languages
üìÇ Input path: /content/
  ‚úì Loaded: draft-markup-languages.md (5667 characters)
  ‚úì Loaded: refined-markup-languages.md (7301 characters)
  ‚úì Loaded: edited-markup-languages.md (7575 characters)
  ‚úì Loaded: final-markup-languages.md (6327 characters)
‚úì Validation passed: Found 4 versions

üîÑ Preprocessing text for all versions...
  Processing draft version...
    ‚úì 4

In [6]:
# CELL 6: INSTALL ANALYSIS PACKAGES

print("üì¶ Dependencies installed and imported successfully!")
print("ü§ñ Loading SentenceTransformer model (this may take a moment)...")

# Load the semantic similarity model
semantic_model = SentenceTransformer("all-MiniLM-L6-v2")
print("‚úÖ Semantic model loaded successfully!")

üì¶ Dependencies installed and imported successfully!
ü§ñ Loading SentenceTransformer model (this may take a moment)...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


‚úÖ Semantic model loaded successfully!


In [7]:
# CELL 7: SIMILARITY ANALYSIS


class SimilarityAnalyzer:
    """Class to handle lexical and semantic similarity analysis."""

    def __init__(self, semantic_model):
        self.semantic_model = semantic_model
        self.similarity_results = {}

    def calculate_lexical_similarity(self, text1, text2):
        """Calculate lexical similarity using multiple metrics."""
        # Jaccard similarity (word-level)
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        jaccard = (
            len(words1.intersection(words2)) / len(words1.union(words2))
            if words1.union(words2)
            else 0
        )

        # Edit distance similarity (character-level)
        sequence_matcher = difflib.SequenceMatcher(None, text1, text2)
        edit_similarity = sequence_matcher.ratio()

        # TF-IDF cosine similarity
        vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1, 2))
        try:
            tfidf_matrix = vectorizer.fit_transform([text1, text2])
            tfidf_similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[
                0
            ][0]
        except:
            tfidf_similarity = 0.0

        return {
            "jaccard_similarity": jaccard,
            "edit_similarity": edit_similarity,
            "tfidf_similarity": tfidf_similarity,
            "lexical_average": (jaccard + edit_similarity + tfidf_similarity) / 3,
        }

    def calculate_semantic_similarity(self, text1, text2):
        """Calculate semantic similarity using sentence embeddings."""
        # Get embeddings
        embeddings = self.semantic_model.encode([text1, text2])

        # Calculate cosine similarity
        similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]

        return {
            "semantic_similarity": float(similarity),
            "embedding_dim": len(embeddings[0]),
        }

    def calculate_sentence_level_similarities(
        self, sentences1, sentences2, version1, version2
    ):
        """Calculate similarities at sentence level between two versions."""
        print(f"    Analyzing {len(sentences1)} vs {len(sentences2)} sentences...")

        sentence_similarities = []

        # Calculate all pairwise similarities
        for i, sent1 in enumerate(sentences1):
            best_match = {"index": -1, "lexical": 0, "semantic": 0, "combined": 0}

            for j, sent2 in enumerate(sentences2):
                # Calculate similarities
                lexical = self.calculate_lexical_similarity(sent1, sent2)
                semantic = self.calculate_semantic_similarity(sent1, sent2)

                # Combined score (weighted average)
                combined = (
                    lexical["lexical_average"] + semantic["semantic_similarity"]
                ) / 2

                if combined > best_match["combined"]:
                    best_match = {
                        "index": j,
                        "lexical": lexical["lexical_average"],
                        "semantic": semantic["semantic_similarity"],
                        "combined": combined,
                        "target_sentence": sent2,
                    }

            sentence_similarities.append(
                {"source_index": i, "source_sentence": sent1, "best_match": best_match}
            )

        return sentence_similarities

    def analyze_version_pair(
        self, version1_data, version2_data, version1_name, version2_name
    ):
        """Analyze similarities between two versions."""
        print(f"  üîç Analyzing {version1_name} ‚Üí {version2_name}")

        # Full text similarity
        full_text_lexical = self.calculate_lexical_similarity(
            version1_data["cleaned_content"], version2_data["cleaned_content"]
        )
        full_text_semantic = self.calculate_semantic_similarity(
            version1_data["cleaned_content"], version2_data["cleaned_content"]
        )

        # Sentence-level analysis
        sentence_analysis = self.calculate_sentence_level_similarities(
            version1_data["sentences"],
            version2_data["sentences"],
            version1_name,
            version2_name,
        )

        # Paragraph-level similarity
        para_lexical = self.calculate_lexical_similarity(
            " ".join(version1_data["paragraphs"]), " ".join(version2_data["paragraphs"])
        )
        para_semantic = self.calculate_semantic_similarity(
            " ".join(version1_data["paragraphs"]), " ".join(version2_data["paragraphs"])
        )

        # Aggregate sentence similarities
        sentence_similarities = [s["best_match"]["combined"] for s in sentence_analysis]
        avg_sentence_similarity = (
            np.mean(sentence_similarities) if sentence_similarities else 0
        )

        return {
            "version_pair": f"{version1_name}_to_{version2_name}",
            "full_text": {
                "lexical": full_text_lexical,
                "semantic": full_text_semantic,
                "combined": (
                    full_text_lexical["lexical_average"]
                    + full_text_semantic["semantic_similarity"]
                )
                / 2,
            },
            "sentence_level": {
                "average_similarity": avg_sentence_similarity,
                "individual_similarities": sentence_similarities,
                "detailed_analysis": sentence_analysis,
            },
            "paragraph_level": {
                "lexical": para_lexical,
                "semantic": para_semantic,
                "combined": (
                    para_lexical["lexical_average"]
                    + para_semantic["semantic_similarity"]
                )
                / 2,
            },
        }

    def analyze_all_versions(self, processed_versions):
        """Analyze similarities between all version pairs."""
        print("\nüîç Step 3: Similarity Analysis")

        version_names = list(processed_versions.keys())
        version_order = ["draft", "refined", "edited", "final"]

        # Sort versions by expected order
        sorted_versions = []
        for expected in version_order:
            if expected in version_names:
                sorted_versions.append(expected)

        # Sequential analysis (draft‚Üírefined‚Üíedited‚Üífinal)
        sequential_results = []
        for i in range(len(sorted_versions) - 1):
            current_version = sorted_versions[i]
            next_version = sorted_versions[i + 1]

            result = self.analyze_version_pair(
                processed_versions[current_version],
                processed_versions[next_version],
                current_version,
                next_version,
            )
            sequential_results.append(result)

        # Draft to final comparison
        draft_to_final = None
        if "draft" in version_names and "final" in version_names:
            print("  üîç Analyzing draft ‚Üí final (overall change)")
            draft_to_final = self.analyze_version_pair(
                processed_versions["draft"],
                processed_versions["final"],
                "draft",
                "final",
            )

        self.similarity_results = {
            "sequential_analysis": sequential_results,
            "draft_to_final": draft_to_final,
            "analysis_timestamp": datetime.now().isoformat(),
            "versions_analyzed": sorted_versions,
        }

        return self.similarity_results


print("üîç SimilarityAnalyzer class loaded. Ready for similarity analysis.")

üîç SimilarityAnalyzer class loaded. Ready for similarity analysis.


In [8]:
# CELL 8: ATTRIBUTION MAPPING (STEP 4)


class AttributionMapper:
    """Class to track content attribution across versions."""

    def __init__(self, semantic_model, similarity_threshold=0.3):
        self.similarity_threshold = similarity_threshold
        self.semantic_model = semantic_model
        self.attribution_results = {}

    def trace_sentence_origins(self, processed_versions, similarity_results):
        """Trace each final sentence back to its earliest appearance."""
        print("\nüìç Step 4: Attribution Mapping")

        version_order = ["draft", "refined", "edited", "final"]
        available_versions = [v for v in version_order if v in processed_versions]

        if "final" not in available_versions:
            print("‚ùå Cannot perform attribution - final version not found")
            return None

        final_sentences = processed_versions["final"]["sentences"]
        sentence_attributions = []

        print(f"  üìù Tracing {len(final_sentences)} final sentences...")

        for final_idx, final_sentence in enumerate(final_sentences):
            attribution = {
                "final_index": final_idx,
                "final_sentence": final_sentence,
                "origin_version": None,
                "origin_index": None,
                "similarity_scores": {},
                "modification_path": [],
            }

            # Check each previous version (in reverse order to find earliest origin)
            for version in reversed(available_versions[:-1]):  # Exclude 'final'
                version_sentences = processed_versions[version]["sentences"]

                best_match = {"index": -1, "similarity": 0, "sentence": ""}

                for sent_idx, version_sentence in enumerate(version_sentences):
                    # Calculate similarity
                    lexical = self._quick_lexical_similarity(
                        final_sentence, version_sentence
                    )
                    semantic = self._quick_semantic_similarity(
                        final_sentence, version_sentence
                    )
                    combined = (lexical + semantic) / 2

                    if combined > best_match["similarity"]:
                        best_match = {
                            "index": sent_idx,
                            "similarity": combined,
                            "sentence": version_sentence,
                        }

                attribution["similarity_scores"][version] = best_match["similarity"]

                # If similarity is above threshold, this could be the origin
                if best_match["similarity"] >= self.similarity_threshold:
                    if (
                        attribution["origin_version"] is None
                    ):  # First match found (earliest version)
                        attribution["origin_version"] = version
                        attribution["origin_index"] = best_match["index"]

                    attribution["modification_path"].append(
                        {
                            "version": version,
                            "similarity": best_match["similarity"],
                            "sentence": best_match["sentence"],
                        }
                    )

            # If no origin found, it's new content
            if attribution["origin_version"] is None:
                attribution["origin_version"] = "new_in_final"

            sentence_attributions.append(attribution)

        return sentence_attributions

    def _quick_lexical_similarity(self, text1, text2):
        """Quick lexical similarity calculation."""
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        if not words1 and not words2:
            return 1.0
        if not words1 or not words2:
            return 0.0
        return len(words1.intersection(words2)) / len(words1.union(words2))

    def _quick_semantic_similarity(self, text1, text2):
        """Quick semantic similarity calculation."""
        embeddings = self.semantic_model.encode([text1, text2])
        return float(cosine_similarity([embeddings[0]], [embeddings[1]])[0][0])

    def calculate_attribution_statistics(self, sentence_attributions):
        """Calculate overall attribution statistics."""
        total_sentences = len(sentence_attributions)

        # Count by origin version
        origin_counts = defaultdict(int)
        for attribution in sentence_attributions:
            origin_counts[attribution["origin_version"]] += 1

        # Calculate percentages
        origin_percentages = {}
        for version, count in origin_counts.items():
            origin_percentages[version] = {
                "count": count,
                "percentage": (count / total_sentences) * 100,
            }

        # Calculate modification statistics
        modification_stats = {
            "high_similarity": 0,  # >0.8
            "medium_similarity": 0,  # 0.5-0.8
            "low_similarity": 0,  # 0.3-0.5
            "new_content": 0,  # <0.3 or new_in_final
        }

        for attribution in sentence_attributions:
            if attribution["origin_version"] == "new_in_final":
                modification_stats["new_content"] += 1
            else:
                # Get highest similarity score
                max_similarity = (
                    max(attribution["similarity_scores"].values())
                    if attribution["similarity_scores"]
                    else 0
                )

                if max_similarity > 0.8:
                    modification_stats["high_similarity"] += 1
                elif max_similarity > 0.5:
                    modification_stats["medium_similarity"] += 1
                elif max_similarity > 0.3:
                    modification_stats["low_similarity"] += 1
                else:
                    modification_stats["new_content"] += 1

        # Convert to percentages
        modification_percentages = {}
        for category, count in modification_stats.items():
            modification_percentages[category] = {
                "count": count,
                "percentage": (count / total_sentences) * 100,
            }

        return {
            "total_sentences": total_sentences,
            "origin_distribution": origin_percentages,
            "modification_distribution": modification_percentages,
        }

    def analyze_attribution(self, processed_versions, similarity_results):
        """Perform complete attribution analysis."""
        sentence_attributions = self.trace_sentence_origins(
            processed_versions, similarity_results
        )

        if sentence_attributions is None:
            return None

        attribution_statistics = self.calculate_attribution_statistics(
            sentence_attributions
        )

        self.attribution_results = {
            "sentence_attributions": sentence_attributions,
            "statistics": attribution_statistics,
            "analysis_timestamp": datetime.now().isoformat(),
            "similarity_threshold": self.similarity_threshold,
        }

        # Print summary
        print("\nüìä Attribution Summary:")
        print(
            f"  Total sentences in final: {attribution_statistics['total_sentences']}"
        )

        print("\n  Origin Distribution:")
        for version, data in attribution_statistics["origin_distribution"].items():
            print(
                f"    {version}: {data['count']} sentences ({data['percentage']:.1f}%)"
            )

        print("\n  Modification Levels:")
        for category, data in attribution_statistics[
            "modification_distribution"
        ].items():
            print(
                f"    {category}: {data['count']} sentences ({data['percentage']:.1f}%)"
            )

        return self.attribution_results


print("üìç AttributionMapper class loaded. Ready for attribution analysis.")

üìç AttributionMapper class loaded. Ready for attribution analysis.


In [9]:
# CELL 9: COMBINED EXECUTION FUNCTION


def run_steps_3_4(article_versions, preprocessor, output_path):
    """Run steps 3-4: Similarity Analysis and Attribution Mapping."""
    print(f"üöÄ Starting Steps 3-4 for article: {article_versions.article_name}")

    # Step 3: Similarity Analysis
    similarity_analyzer = SimilarityAnalyzer(semantic_model)
    similarity_results = similarity_analyzer.analyze_all_versions(
        preprocessor.processed_versions
    )

    # Step 4: Attribution Mapping
    attribution_mapper = AttributionMapper(semantic_model, similarity_threshold=0.3)
    attribution_results = attribution_mapper.analyze_attribution(
        preprocessor.processed_versions, similarity_results
    )

    # Combine results
    combined_results = {
        "article_name": article_versions.article_name,
        "analysis_timestamp": datetime.now().isoformat(),
        "article_metadata": article_versions.metadata,
        "processing_summary": preprocessor.get_processing_summary(),
        "similarity_analysis": similarity_results,
        "attribution_analysis": attribution_results,
    }

    # Save comprehensive results
    results_file = (
        f"{output_path}/{article_versions.article_name}_complete_analysis.json"
    )
    with open(results_file, "w", encoding="utf-8") as f:
        json.dump(combined_results, f, indent=2, ensure_ascii=False)

    print(f"\nüíæ Complete analysis saved: {results_file}")

    # Generate summary metrics for article footer
    footer_metrics = generate_footer_metrics(combined_results)

    # Save footer metrics separately
    footer_file = f"{output_path}/{article_versions.article_name}_footer_metrics.json"
    with open(footer_file, "w", encoding="utf-8") as f:
        json.dump(footer_metrics, f, indent=2, ensure_ascii=False)

    print(f"üìä Footer metrics saved: {footer_file}")

    return combined_results, footer_metrics


def generate_footer_metrics(combined_results):
    """Generate clean metrics for article footer."""
    # Get processing stats
    processing = combined_results["processing_summary"]

    # Get attribution stats
    if combined_results["attribution_analysis"]:
        attribution = combined_results["attribution_analysis"]["statistics"]
        origin_dist = attribution["origin_distribution"]
        modification_dist = attribution["modification_distribution"]
    else:
        origin_dist = {}
        modification_dist = {}

    # Get similarity stats (draft to final)
    draft_to_final = combined_results["similarity_analysis"]["draft_to_final"]
    overall_similarity = (
        draft_to_final["full_text"]["combined"] if draft_to_final else 0
    )

    footer_metrics = {
        "article_name": combined_results["article_name"],
        "word_progression": {
            "draft": processing.get("draft", {}).get("word_count", 0),
            "final": processing.get("final", {}).get("word_count", 0),
            "change_percentage": 0,
        },
        "content_retention": {
            "overall_similarity": round(overall_similarity * 100, 1),
            "content_origins": {},
        },
        "modification_summary": {},
        "generated_at": datetime.now().isoformat(),
    }

    # Calculate word change percentage
    if footer_metrics["word_progression"]["draft"] > 0:
        draft_words = footer_metrics["word_progression"]["draft"]
        final_words = footer_metrics["word_progression"]["final"]
        change = ((final_words - draft_words) / draft_words) * 100
        footer_metrics["word_progression"]["change_percentage"] = round(change, 1)

    # Simplify origin distribution for footer
    for version, data in origin_dist.items():
        if version != "new_in_final":
            footer_metrics["content_retention"]["content_origins"][version] = round(
                data["percentage"], 1
            )

    # Simplify modification distribution
    for category, data in modification_dist.items():
        clean_category = category.replace("_", " ").title()
        footer_metrics["modification_summary"][clean_category] = round(
            data["percentage"], 1
        )

    return footer_metrics


print("üéØ Execution functions loaded. Ready to run complete analysis!")
print("\nTo run the complete analysis:")
print(
    "combined_results, footer_metrics = run_steps_3_4(article_versions, preprocessor, 'your_output_path')"
)

üéØ Execution functions loaded. Ready to run complete analysis!

To run the complete analysis:
combined_results, footer_metrics = run_steps_3_4(article_versions, preprocessor, 'your_output_path')


In [10]:
# CELL 10: QUICK EXECUTION FOR EXISTING DATA


def run_complete_analysis_from_existing(article_versions, preprocessor):
    """Run steps 3-4 using the output path from existing data."""
    # Find where the Step 1-2 checkpoint was actually saved
    base_path = article_versions.input_path

    # Check for the nested output structure that was created in Steps 1-2
    nested_output_path = os.path.join(base_path, "output", "output")
    regular_output_path = os.path.join(base_path, "output")

    # Use the path where the checkpoint file exists
    checkpoint_file = "markup-languages_checkpoint_steps_1_2.json"

    if os.path.exists(os.path.join(nested_output_path, checkpoint_file)):
        output_path = nested_output_path
        print(f"üìÇ Using nested output path: {output_path}")
    elif os.path.exists(os.path.join(regular_output_path, checkpoint_file)):
        output_path = regular_output_path
        print(f"üìÇ Using regular output path: {output_path}")
    else:
        # Create regular output path as fallback
        output_path = regular_output_path
        os.makedirs(output_path, exist_ok=True)
        print(f"üìÇ Created output path: {output_path}")

    return run_steps_3_4(article_versions, preprocessor, output_path)


print("‚ö° Quick execution function available:")
print(
    "combined_results, footer_metrics = run_complete_analysis_from_existing(article_versions, preprocessor)"
)

‚ö° Quick execution function available:
combined_results, footer_metrics = run_complete_analysis_from_existing(article_versions, preprocessor)


In [11]:
combined_results, footer_metrics = run_complete_analysis_from_existing(
    article_versions, preprocessor
)

üìÇ Using regular output path: /content/output
üöÄ Starting Steps 3-4 for article: markup-languages

üîç Step 3: Similarity Analysis
  üîç Analyzing draft ‚Üí refined
    Analyzing 42 vs 55 sentences...
  üîç Analyzing refined ‚Üí edited
    Analyzing 55 vs 39 sentences...
  üîç Analyzing edited ‚Üí final
    Analyzing 39 vs 41 sentences...
  üîç Analyzing draft ‚Üí final (overall change)
  üîç Analyzing draft ‚Üí final
    Analyzing 42 vs 41 sentences...

üìç Step 4: Attribution Mapping
  üìù Tracing 41 final sentences...

üìä Attribution Summary:
  Total sentences in final: 41

  Origin Distribution:
    edited: 33 sentences (80.5%)
    draft: 1 sentences (2.4%)
    new_in_final: 7 sentences (17.1%)

  Modification Levels:
    high_similarity: 3 sentences (7.3%)
    medium_similarity: 15 sentences (36.6%)
    low_similarity: 16 sentences (39.0%)
    new_content: 7 sentences (17.1%)

üíæ Complete analysis saved: /content/output/markup-languages_complete_analysis.json
üìä 