<a href="https://colab.research.google.com/github/wtrekell/syntaxandempathy/blob/main/ai_vs_human.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
# =============================================================================
# CELL 1: SETUP AND CONFIGURATION
# =============================================================================

import os
import re
import json
import pandas as pd
from pathlib import Path
from datetime import datetime
from google.colab import drive, files
import zipfile

# Mount Google Drive (will prompt for authorization)
print("Mounting Google Drive...")
drive.mount('/content/drive')
print("✓ Drive mounted successfully")

# Configuration
class Config:
    """Configuration settings for the analysis"""

    # File naming conventions
    VERSION_PREFIXES = ['draft-', 'refined-', 'edited-', 'final-']
    VERSION_ORDER = {prefix: i for i, prefix in enumerate(VERSION_PREFIXES)}

    # Analysis settings
    MIN_SENTENCE_LENGTH = 10  # Minimum characters for a sentence
    MAX_SENTENCE_LENGTH = 1000  # Maximum characters for a sentence

def setup_output_directories(base_path):
    """Create necessary output directories"""
    # Don't create nested folders - use the base path directly
    output_dir = base_path
    archive_dir = os.path.join(base_path, 'archive')

    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(archive_dir, exist_ok=True)

    print(f"✓ Output directory ready: {output_dir}")
    print(f"✓ Archive directory ready: {archive_dir}")

    return output_dir, archive_dir

print("📋 Configuration loaded. Ready to process articles.")

Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✓ Drive mounted successfully
📋 Configuration loaded. Ready to process articles.


In [14]:
# =============================================================================
# CELL 2: DATA INGESTION & VALIDATION (STEP 1)
# =============================================================================

class ArticleVersions:
    """Class to handle loading and validating article versions"""

    def __init__(self, article_name, input_path):
        self.article_name = article_name
        self.input_path = input_path
        self.versions = {}
        self.metadata = {
            'article_name': article_name,
            'input_path': input_path,
            'processing_timestamp': datetime.now().isoformat(),
            'versions_found': [],
            'validation_status': 'pending'
        }

    def load_versions(self):
        """Load all versions of an article from the specified path"""
        print(f"\n📁 Loading versions for article: {self.article_name}")
        print(f"📂 Input path: {self.input_path}")

        # Find all files matching the article name pattern
        for prefix in Config.VERSION_PREFIXES:
            filename = f"{prefix}{self.article_name}.md"
            filepath = os.path.join(self.input_path, filename)

            if os.path.exists(filepath):
                try:
                    with open(filepath, 'r', encoding='utf-8') as file:
                        content = file.read()
                        self.versions[prefix.rstrip('-')] = {
                            'filename': filename,
                            'filepath': filepath,
                            'content': content,
                            'loaded_at': datetime.now().isoformat(),
                            'file_size': len(content)
                        }
                        print(f"  ✓ Loaded: {filename} ({len(content)} characters)")

                except Exception as e:
                    print(f"  ✗ Error loading {filename}: {str(e)}")
            else:
                print(f"  - Not found: {filename}")

        self.metadata['versions_found'] = list(self.versions.keys())
        return self.versions

    def validate_version_sequence(self):
        """Validate that we have the minimum required versions"""
        found_versions = set(self.versions.keys())
        required_versions = ['draft', 'final']

        # Check for required versions
        missing_required = []
        for version in required_versions:
            if version not in found_versions:
                missing_required.append(version)

        # Validation results
        validation_results = {
            'has_draft': 'draft' in found_versions,
            'has_final': 'final' in found_versions,
            'missing_required': missing_required,
            'versions_found': list(found_versions),
            'is_valid': len(missing_required) == 0
        }

        # Update metadata
        self.metadata['validation_results'] = validation_results

        if validation_results['is_valid']:
            self.metadata['validation_status'] = 'passed'
            print(f"✓ Validation passed: Found {len(found_versions)} versions")
        else:
            self.metadata['validation_status'] = 'failed'
            print(f"✗ Validation failed: Missing required versions")
            print(f"  Missing: {', '.join(missing_required)}")

        return validation_results

    def get_summary(self):
        """Get a summary of loaded versions"""
        summary = {
            'article_name': self.article_name,
            'input_path': self.input_path,
            'versions_count': len(self.versions),
            'validation_status': self.metadata['validation_status'],
            'file_sizes': {}
        }

        for version, data in self.versions.items():
            summary['file_sizes'][version] = data['file_size']

        return summary

print("📖 ArticleVersions class loaded. Ready for data ingestion.")

📖 ArticleVersions class loaded. Ready for data ingestion.


In [15]:
# =============================================================================
# CELL 3: TEXT PREPROCESSING (STEP 2)
# =============================================================================

class TextPreprocessor:
    """Class to handle text preprocessing and segmentation"""

    def __init__(self):
        self.processed_versions = {}

    def clean_markdown(self, text):
        """Clean markdown formatting while preserving content structure"""
        # Remove markdown formatting but keep the text
        patterns = [
            (r'^\s*#{1,6}\s+', ''),  # Headers
            (r'\*\*(.*?)\*\*', r'\1'),  # Bold
            (r'\*(.*?)\*', r'\1'),  # Italic
            (r'`(.*?)`', r'\1'),  # Inline code
            (r'```.*?```', ''),  # Code blocks
            (r'!\[.*?\]\(.*?\)', ''),  # Images
            (r'\[([^\]]+)\]\([^\)]+\)', r'\1'),  # Links
            (r'^\s*[\*\-\+]\s+', ''),  # Bullet points
            (r'^\s*\d+\.\s+', ''),  # Numbered lists
            (r'\n{3,}', '\n\n'),  # Multiple newlines
        ]

        cleaned_text = text

        # Apply patterns that need multiline flag
        multiline_patterns = [
            (r'^\s*[\*\-\+]\s+', ''),  # Bullet points
            (r'^\s*\d+\.\s+', ''),  # Numbered lists
        ]

        for pattern, replacement in multiline_patterns:
            cleaned_text = re.sub(pattern, replacement, cleaned_text, flags=re.MULTILINE)

        # Apply regular patterns
        regular_patterns = [
            (r'^\s*#{1,6}\s+', ''),  # Headers
            (r'\*\*(.*?)\*\*', r'\1'),  # Bold
            (r'\*(.*?)\*', r'\1'),  # Italic
            (r'`(.*?)`', r'\1'),  # Inline code
            (r'```.*?```', ''),  # Code blocks
            (r'!\[.*?\]\(.*?\)', ''),  # Images
            (r'\[([^\]]+)\]\([^\)]+\)', r'\1'),  # Links
            (r'\n{3,}', '\n\n'),  # Multiple newlines
        ]

        for pattern, replacement in regular_patterns:
            cleaned_text = re.sub(pattern, replacement, cleaned_text)

        return cleaned_text.strip()

    def segment_into_sentences(self, text):
        """Segment text into sentences with basic filtering"""
        # Simple sentence segmentation (can be enhanced with spaCy later if needed)
        sentences = re.split(r'[.!?]+\s+', text)

        # Filter sentences
        filtered_sentences = []
        for sentence in sentences:
            sentence = sentence.strip()
            if (Config.MIN_SENTENCE_LENGTH <= len(sentence) <= Config.MAX_SENTENCE_LENGTH
                and sentence):
                filtered_sentences.append(sentence)

        return filtered_sentences

    def segment_into_paragraphs(self, text):
        """Segment text into paragraphs"""
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        return paragraphs

    def process_version(self, version_name, raw_content):
        """Process a single version of the article"""
        print(f"  Processing {version_name} version...")

        # Clean the markdown
        cleaned_content = self.clean_markdown(raw_content)

        # Segment into different units
        sentences = self.segment_into_sentences(cleaned_content)
        paragraphs = self.segment_into_paragraphs(cleaned_content)

        # Calculate basic statistics
        stats = {
            'character_count': len(cleaned_content),
            'word_count': len(cleaned_content.split()),
            'sentence_count': len(sentences),
            'paragraph_count': len(paragraphs),
            'avg_sentence_length': sum(len(s) for s in sentences) / len(sentences) if sentences else 0,
            'avg_paragraph_length': sum(len(p) for p in paragraphs) / len(paragraphs) if paragraphs else 0
        }

        processed_data = {
            'version_name': version_name,
            'raw_content': raw_content,
            'cleaned_content': cleaned_content,
            'sentences': sentences,
            'paragraphs': paragraphs,
            'statistics': stats,
            'processed_at': datetime.now().isoformat()
        }

        self.processed_versions[version_name] = processed_data

        print(f"    ✓ {stats['sentence_count']} sentences, {stats['paragraph_count']} paragraphs")
        print(f"    ✓ {stats['word_count']} words, {stats['character_count']} characters")

        return processed_data

    def process_all_versions(self, article_versions):
        """Process all versions of an article"""
        print(f"\n🔄 Preprocessing text for all versions...")

        for version_name, version_data in article_versions.versions.items():
            self.process_version(version_name, version_data['content'])

        return self.processed_versions

    def get_processing_summary(self):
        """Get a summary of processing results"""
        summary = {}
        for version_name, data in self.processed_versions.items():
            summary[version_name] = data['statistics']

        return summary

print("🔧 TextPreprocessor class loaded. Ready for text processing.")

🔧 TextPreprocessor class loaded. Ready for text processing.


In [16]:
# =============================================================================
# CELL 4: EXECUTION FUNCTIONS AND CHECKPOINT MANAGEMENT
# =============================================================================

def save_checkpoint_data(article_versions, preprocessor, output_path, checkpoint_name="steps_1_2"):
    """Save checkpoint data for review"""
    checkpoint_data = {
        'checkpoint_name': checkpoint_name,
        'timestamp': datetime.now().isoformat(),
        'article_metadata': article_versions.metadata,
        'processing_summary': preprocessor.get_processing_summary(),
        'validation_results': article_versions.metadata.get('validation_results', {}),
        'article_summary': article_versions.get_summary()
    }

    # Save to output directory
    checkpoint_file = f"{output_path}/{article_versions.article_name}_checkpoint_{checkpoint_name}.json"

    with open(checkpoint_file, 'w', encoding='utf-8') as f:
        json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)

    print(f"\n💾 Checkpoint saved: {checkpoint_file}")
    return checkpoint_data

def run_steps_1_2(article_name, input_path, base_output_path):
    """Run steps 1-2 for a given article"""
    print(f"🚀 Starting Steps 1-2 for article: {article_name}")
    print(f"📂 Input path: {input_path}")

    # Setup output directories
    output_dir, archive_dir = setup_output_directories(base_output_path)

    # Step 1: Data Ingestion & Validation
    article_versions = ArticleVersions(article_name, input_path)
    article_versions.load_versions()
    validation_results = article_versions.validate_version_sequence()

    if not validation_results['is_valid']:
        print("❌ Cannot proceed: Missing required versions (draft and final)")
        return None, None

    # Step 2: Text Preprocessing
    preprocessor = TextPreprocessor()
    preprocessor.process_all_versions(article_versions)

    # Save checkpoint
    checkpoint_data = save_checkpoint_data(article_versions, preprocessor, output_dir)

    print(f"\n✅ Steps 1-2 completed successfully!")
    print(f"📊 Processing Summary:")
    for version, stats in preprocessor.get_processing_summary().items():
        print(f"  {version}: {stats['word_count']} words, {stats['sentence_count']} sentences")

    return article_versions, preprocessor

# Interactive input functions
def get_user_inputs():
    """Get user inputs for processing"""
    print("📝 Please provide the following information:")

    article_name = input("Enter article name (without .md extension): ").strip()
    input_path = input("Enter full path to input folder containing markdown files: ").strip()
    base_output_path = input("Enter full path to base output folder: ").strip()

    print(f"\n📋 Configuration:")
    print(f"  Article name: {article_name}")
    print(f"  Input path: {input_path}")
    print(f"  Output path: {base_output_path}")

    confirm = input("\nProceed with these settings? (y/n): ").strip().lower()

    if confirm == 'y':
        return article_name, input_path, base_output_path
    else:
        print("❌ Cancelled. Run get_user_inputs() again to restart.")
        return None, None, None

def process_article_interactive():
    """Process an article with interactive inputs"""
    article_name, input_path, base_output_path = get_user_inputs()

    if article_name and input_path and base_output_path:
        return run_steps_1_2(article_name, input_path, base_output_path)
    else:
        return None, None

print("📋 Ready to process your article!")
print("Run: article_versions, preprocessor = process_article_interactive()")
print("\nMake sure your markdown files are named:")
print("- draft-your-article-name.md")
print("- refined-your-article-name.md")
print("- edited-your-article-name.md")
print("- final-your-article-name.md")

📋 Ready to process your article!
Run: article_versions, preprocessor = process_article_interactive()

Make sure your markdown files are named:
- draft-your-article-name.md
- refined-your-article-name.md
- edited-your-article-name.md
- final-your-article-name.md


In [17]:
# =============================================================================
# CELL 5: SAMPLE DATA CREATOR (FOR TESTING ONLY)
# =============================================================================

def create_sample_files_for_testing(output_path):
    """Create sample markdown files for testing (run this once to test)"""
    sample_content = {
        'draft-': """# Sample Article

This is a draft article about artificial intelligence and its impact on society. AI has revolutionized many industries.

The technology continues to evolve rapidly. Machine learning algorithms are becoming more sophisticated every day.

We must consider the ethical implications of AI development.""",

        'refined-': """# Sample Article

This is a refined article examining artificial intelligence and its transformative impact on modern society. AI has fundamentally revolutionized numerous industries across the globe.

The technology continues to evolve at an unprecedented pace. Advanced machine learning algorithms are becoming increasingly sophisticated with each passing day.

We must carefully consider the complex ethical implications of AI development and deployment.""",

        'edited-': """# Sample Article

This comprehensive article examines artificial intelligence and its transformative impact on modern society. AI has fundamentally revolutionized numerous industries worldwide, reshaping how we work and live.

The technology continues to evolve at an unprecedented pace, driven by breakthrough innovations. Advanced machine learning algorithms are becoming increasingly sophisticated, enabling new applications we never thought possible.

We must carefully consider the complex ethical implications of AI development and deployment, ensuring responsible innovation for the benefit of humanity.""",

        'final-': """# Sample Article

This comprehensive article examines artificial intelligence and its transformative impact on modern society. AI has fundamentally revolutionized numerous industries worldwide, reshaping how we work, communicate, and live.

The technology continues to evolve at an unprecedented pace, driven by breakthrough innovations in computing power and algorithmic design. Advanced machine learning algorithms are becoming increasingly sophisticated, enabling new applications we never thought possible just a decade ago.

We must carefully consider the complex ethical implications of AI development and deployment, ensuring responsible innovation that serves the benefit of all humanity while mitigating potential risks."""
    }

    article_name = "sample-article"
    os.makedirs(output_path, exist_ok=True)

    for prefix, content in sample_content.items():
        filename = f"{prefix}{article_name}.md"
        filepath = os.path.join(output_path, filename)

        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(content)

        print(f"Created: {filename}")

    return article_name, output_path

print("\n🧪 Sample data creator available for testing if needed.")
print("To create test files, run:")
print("sample_name, sample_path = create_sample_files_for_testing('/your/test/path')")


🧪 Sample data creator available for testing if needed.
To create test files, run:
sample_name, sample_path = create_sample_files_for_testing('/your/test/path')


In [18]:
article_versions, preprocessor = process_article_interactive()

📝 Please provide the following information:
Enter article name (without .md extension): markup-languages
Enter full path to input folder containing markdown files: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human
Enter full path to base output folder: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human/output/output

📋 Configuration:
  Article name: markup-languages
  Input path: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human
  Output path: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human/output/output

Proceed with these settings? (y/n): y
🚀 Starting Steps 1-2 for article: markup-languages
📂 Input path: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human
✓ Output directory ready: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human/output/output
✓ Archive directory ready: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-

In [32]:
!pip install sentence-transformers scikit-learn -q

import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer
import difflib
from collections import defaultdict
import json
from datetime import datetime

print("📦 Dependencies installed and imported successfully!")
print("🤖 Loading SentenceTransformer model (this may take a moment)...")

# Load the semantic similarity model
semantic_model = SentenceTransformer('all-MiniLM-L6-v2')
print("✅ Semantic model loaded successfully!")

📦 Dependencies installed and imported successfully!
🤖 Loading SentenceTransformer model (this may take a moment)...
✅ Semantic model loaded successfully!


In [33]:
# =============================================================================
# CELL 2: SIMILARITY ANALYSIS (STEP 3)
# =============================================================================

class SimilarityAnalyzer:
    """Class to handle lexical and semantic similarity analysis"""

    def __init__(self, semantic_model):
        self.semantic_model = semantic_model
        self.similarity_results = {}

    def calculate_lexical_similarity(self, text1, text2):
        """Calculate lexical similarity using multiple metrics"""
        # Jaccard similarity (word-level)
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        jaccard = len(words1.intersection(words2)) / len(words1.union(words2)) if words1.union(words2) else 0

        # Edit distance similarity (character-level)
        sequence_matcher = difflib.SequenceMatcher(None, text1, text2)
        edit_similarity = sequence_matcher.ratio()

        # TF-IDF cosine similarity
        vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1, 2))
        try:
            tfidf_matrix = vectorizer.fit_transform([text1, text2])
            tfidf_similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
        except:
            tfidf_similarity = 0.0

        return {
            'jaccard_similarity': jaccard,
            'edit_similarity': edit_similarity,
            'tfidf_similarity': tfidf_similarity,
            'lexical_average': (jaccard + edit_similarity + tfidf_similarity) / 3
        }

    def calculate_semantic_similarity(self, text1, text2):
        """Calculate semantic similarity using sentence embeddings"""
        # Get embeddings
        embeddings = self.semantic_model.encode([text1, text2])

        # Calculate cosine similarity
        similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]

        return {
            'semantic_similarity': float(similarity),
            'embedding_dim': len(embeddings[0])
        }

    def calculate_sentence_level_similarities(self, sentences1, sentences2, version1, version2):
        """Calculate similarities at sentence level between two versions"""
        print(f"    Analyzing {len(sentences1)} vs {len(sentences2)} sentences...")

        sentence_similarities = []

        # Calculate all pairwise similarities
        for i, sent1 in enumerate(sentences1):
            best_match = {'index': -1, 'lexical': 0, 'semantic': 0, 'combined': 0}

            for j, sent2 in enumerate(sentences2):
                # Calculate similarities
                lexical = self.calculate_lexical_similarity(sent1, sent2)
                semantic = self.calculate_semantic_similarity(sent1, sent2)

                # Combined score (weighted average)
                combined = (lexical['lexical_average'] + semantic['semantic_similarity']) / 2

                if combined > best_match['combined']:
                    best_match = {
                        'index': j,
                        'lexical': lexical['lexical_average'],
                        'semantic': semantic['semantic_similarity'],
                        'combined': combined,
                        'target_sentence': sent2
                    }

            sentence_similarities.append({
                'source_index': i,
                'source_sentence': sent1,
                'best_match': best_match
            })

        return sentence_similarities

    def analyze_version_pair(self, version1_data, version2_data, version1_name, version2_name):
        """Analyze similarities between two versions"""
        print(f"  🔍 Analyzing {version1_name} → {version2_name}")

        # Full text similarity
        full_text_lexical = self.calculate_lexical_similarity(
            version1_data['cleaned_content'],
            version2_data['cleaned_content']
        )
        full_text_semantic = self.calculate_semantic_similarity(
            version1_data['cleaned_content'],
            version2_data['cleaned_content']
        )

        # Sentence-level analysis
        sentence_analysis = self.calculate_sentence_level_similarities(
            version1_data['sentences'],
            version2_data['sentences'],
            version1_name,
            version2_name
        )

        # Paragraph-level similarity
        para_lexical = self.calculate_lexical_similarity(
            ' '.join(version1_data['paragraphs']),
            ' '.join(version2_data['paragraphs'])
        )
        para_semantic = self.calculate_semantic_similarity(
            ' '.join(version1_data['paragraphs']),
            ' '.join(version2_data['paragraphs'])
        )

        # Aggregate sentence similarities
        sentence_similarities = [s['best_match']['combined'] for s in sentence_analysis]
        avg_sentence_similarity = np.mean(sentence_similarities) if sentence_similarities else 0

        return {
            'version_pair': f"{version1_name}_to_{version2_name}",
            'full_text': {
                'lexical': full_text_lexical,
                'semantic': full_text_semantic,
                'combined': (full_text_lexical['lexical_average'] + full_text_semantic['semantic_similarity']) / 2
            },
            'sentence_level': {
                'average_similarity': avg_sentence_similarity,
                'individual_similarities': sentence_similarities,
                'detailed_analysis': sentence_analysis
            },
            'paragraph_level': {
                'lexical': para_lexical,
                'semantic': para_semantic,
                'combined': (para_lexical['lexical_average'] + para_semantic['semantic_similarity']) / 2
            }
        }

    def analyze_all_versions(self, processed_versions):
        """Analyze similarities between all version pairs"""
        print(f"\n🔍 Step 3: Similarity Analysis")

        version_names = list(processed_versions.keys())
        version_order = ['draft', 'refined', 'edited', 'final']

        # Sort versions by expected order
        sorted_versions = []
        for expected in version_order:
            if expected in version_names:
                sorted_versions.append(expected)

        # Sequential analysis (draft→refined→edited→final)
        sequential_results = []
        for i in range(len(sorted_versions) - 1):
            current_version = sorted_versions[i]
            next_version = sorted_versions[i + 1]

            result = self.analyze_version_pair(
                processed_versions[current_version],
                processed_versions[next_version],
                current_version,
                next_version
            )
            sequential_results.append(result)

        # Draft to final comparison
        draft_to_final = None
        if 'draft' in version_names and 'final' in version_names:
            print(f"  🔍 Analyzing draft → final (overall change)")
            draft_to_final = self.analyze_version_pair(
                processed_versions['draft'],
                processed_versions['final'],
                'draft',
                'final'
            )

        self.similarity_results = {
            'sequential_analysis': sequential_results,
            'draft_to_final': draft_to_final,
            'analysis_timestamp': datetime.now().isoformat(),
            'versions_analyzed': sorted_versions
        }

        return self.similarity_results

print("🔍 SimilarityAnalyzer class loaded. Ready for similarity analysis.")

🔍 SimilarityAnalyzer class loaded. Ready for similarity analysis.


In [34]:
# =============================================================================
# CELL 3: ATTRIBUTION MAPPING (STEP 4)
# =============================================================================

class AttributionMapper:
    """Class to track content attribution across versions"""

    def __init__(self, similarity_threshold=0.3):
        self.similarity_threshold = similarity_threshold
        self.attribution_results = {}

    def trace_sentence_origins(self, processed_versions, similarity_results):
        """Trace each final sentence back to its earliest appearance"""
        print(f"\n📍 Step 4: Attribution Mapping")

        version_order = ['draft', 'refined', 'edited', 'final']
        available_versions = [v for v in version_order if v in processed_versions]

        if 'final' not in available_versions:
            print("❌ Cannot perform attribution - final version not found")
            return None

        final_sentences = processed_versions['final']['sentences']
        sentence_attributions = []

        print(f"  📝 Tracing {len(final_sentences)} final sentences...")

        for final_idx, final_sentence in enumerate(final_sentences):
            attribution = {
                'final_index': final_idx,
                'final_sentence': final_sentence,
                'origin_version': None,
                'origin_index': None,
                'similarity_scores': {},
                'modification_path': []
            }

            # Check each previous version (in reverse order to find earliest origin)
            for version in reversed(available_versions[:-1]):  # Exclude 'final'
                version_sentences = processed_versions[version]['sentences']

                best_match = {'index': -1, 'similarity': 0, 'sentence': ''}

                for sent_idx, version_sentence in enumerate(version_sentences):
                    # Calculate similarity
                    lexical = self._quick_lexical_similarity(final_sentence, version_sentence)
                    semantic = self._quick_semantic_similarity(final_sentence, version_sentence)
                    combined = (lexical + semantic) / 2

                    if combined > best_match['similarity']:
                        best_match = {
                            'index': sent_idx,
                            'similarity': combined,
                            'sentence': version_sentence
                        }

                attribution['similarity_scores'][version] = best_match['similarity']

                # If similarity is above threshold, this could be the origin
                if best_match['similarity'] >= self.similarity_threshold:
                    if attribution['origin_version'] is None:  # First match found (earliest version)
                        attribution['origin_version'] = version
                        attribution['origin_index'] = best_match['index']

                    attribution['modification_path'].append({
                        'version': version,
                        'similarity': best_match['similarity'],
                        'sentence': best_match['sentence']
                    })

            # If no origin found, it's new content
            if attribution['origin_version'] is None:
                attribution['origin_version'] = 'new_in_final'

            sentence_attributions.append(attribution)

        return sentence_attributions

    def _quick_lexical_similarity(self, text1, text2):
        """Quick lexical similarity calculation"""
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        if not words1 and not words2:
            return 1.0
        if not words1 or not words2:
            return 0.0
        return len(words1.intersection(words2)) / len(words1.union(words2))

    def _quick_semantic_similarity(self, text1, text2):
        """Quick semantic similarity calculation"""
        embeddings = semantic_model.encode([text1, text2])
        return float(cosine_similarity([embeddings[0]], [embeddings[1]])[0][0])

    def calculate_attribution_statistics(self, sentence_attributions):
        """Calculate overall attribution statistics"""
        total_sentences = len(sentence_attributions)

        # Count by origin version
        origin_counts = defaultdict(int)
        for attribution in sentence_attributions:
            origin_counts[attribution['origin_version']] += 1

        # Calculate percentages
        origin_percentages = {}
        for version, count in origin_counts.items():
            origin_percentages[version] = {
                'count': count,
                'percentage': (count / total_sentences) * 100
            }

        # Calculate modification statistics
        modification_stats = {
            'high_similarity': 0,  # >0.8
            'medium_similarity': 0,  # 0.5-0.8
            'low_similarity': 0,  # 0.3-0.5
            'new_content': 0  # <0.3 or new_in_final
        }

        for attribution in sentence_attributions:
            if attribution['origin_version'] == 'new_in_final':
                modification_stats['new_content'] += 1
            else:
                # Get highest similarity score
                max_similarity = max(attribution['similarity_scores'].values()) if attribution['similarity_scores'] else 0

                if max_similarity > 0.8:
                    modification_stats['high_similarity'] += 1
                elif max_similarity > 0.5:
                    modification_stats['medium_similarity'] += 1
                elif max_similarity > 0.3:
                    modification_stats['low_similarity'] += 1
                else:
                    modification_stats['new_content'] += 1

        # Convert to percentages
        modification_percentages = {}
        for category, count in modification_stats.items():
            modification_percentages[category] = {
                'count': count,
                'percentage': (count / total_sentences) * 100
            }

        return {
            'total_sentences': total_sentences,
            'origin_distribution': origin_percentages,
            'modification_distribution': modification_percentages
        }

    def analyze_attribution(self, processed_versions, similarity_results):
        """Perform complete attribution analysis"""
        sentence_attributions = self.trace_sentence_origins(processed_versions, similarity_results)

        if sentence_attributions is None:
            return None

        attribution_statistics = self.calculate_attribution_statistics(sentence_attributions)

        self.attribution_results = {
            'sentence_attributions': sentence_attributions,
            'statistics': attribution_statistics,
            'analysis_timestamp': datetime.now().isoformat(),
            'similarity_threshold': self.similarity_threshold
        }

        # Print summary
        print(f"\n📊 Attribution Summary:")
        print(f"  Total sentences in final: {attribution_statistics['total_sentences']}")

        print(f"\n  Origin Distribution:")
        for version, data in attribution_statistics['origin_distribution'].items():
            print(f"    {version}: {data['count']} sentences ({data['percentage']:.1f}%)")

        print(f"\n  Modification Levels:")
        for category, data in attribution_statistics['modification_distribution'].items():
            print(f"    {category}: {data['count']} sentences ({data['percentage']:.1f}%)")

        return self.attribution_results

print("📍 AttributionMapper class loaded. Ready for attribution analysis.")

📍 AttributionMapper class loaded. Ready for attribution analysis.


In [35]:
# =============================================================================
# CELL 4: COMBINED EXECUTION FUNCTION
# =============================================================================

def run_steps_3_4(article_versions, preprocessor, output_path):
    """Run steps 3-4: Similarity Analysis and Attribution Mapping"""
    print(f"🚀 Starting Steps 3-4 for article: {article_versions.article_name}")

    # Step 3: Similarity Analysis
    similarity_analyzer = SimilarityAnalyzer(semantic_model)
    similarity_results = similarity_analyzer.analyze_all_versions(preprocessor.processed_versions)

    # Step 4: Attribution Mapping
    attribution_mapper = AttributionMapper(similarity_threshold=0.3)
    attribution_results = attribution_mapper.analyze_attribution(
        preprocessor.processed_versions,
        similarity_results
    )

    # Combine results
    combined_results = {
        'article_name': article_versions.article_name,
        'analysis_timestamp': datetime.now().isoformat(),
        'article_metadata': article_versions.metadata,
        'processing_summary': preprocessor.get_processing_summary(),
        'similarity_analysis': similarity_results,
        'attribution_analysis': attribution_results
    }

    # Save comprehensive results
    results_file = f"{output_path}/{article_versions.article_name}_complete_analysis.json"
    with open(results_file, 'w', encoding='utf-8') as f:
        json.dump(combined_results, f, indent=2, ensure_ascii=False)

    print(f"\n💾 Complete analysis saved: {results_file}")

    # Generate summary metrics for article footer
    footer_metrics = generate_footer_metrics(combined_results)

    # Save footer metrics separately
    footer_file = f"{output_path}/{article_versions.article_name}_footer_metrics.json"
    with open(footer_file, 'w', encoding='utf-8') as f:
        json.dump(footer_metrics, f, indent=2, ensure_ascii=False)

    print(f"📊 Footer metrics saved: {footer_file}")

    return combined_results, footer_metrics

def generate_footer_metrics(combined_results):
    """Generate clean metrics for article footer"""

    # Get processing stats
    processing = combined_results['processing_summary']

    # Get attribution stats
    if combined_results['attribution_analysis']:
        attribution = combined_results['attribution_analysis']['statistics']
        origin_dist = attribution['origin_distribution']
        modification_dist = attribution['modification_distribution']
    else:
        origin_dist = {}
        modification_dist = {}

    # Get similarity stats (draft to final)
    draft_to_final = combined_results['similarity_analysis']['draft_to_final']
    overall_similarity = draft_to_final['full_text']['combined'] if draft_to_final else 0

    footer_metrics = {
        'article_name': combined_results['article_name'],
        'word_progression': {
            'draft': processing.get('draft', {}).get('word_count', 0),
            'final': processing.get('final', {}).get('word_count', 0),
            'change_percentage': 0
        },
        'content_retention': {
            'overall_similarity': round(overall_similarity * 100, 1),
            'content_origins': {}
        },
        'modification_summary': {},
        'generated_at': datetime.now().isoformat()
    }

    # Calculate word change percentage
    if footer_metrics['word_progression']['draft'] > 0:
        draft_words = footer_metrics['word_progression']['draft']
        final_words = footer_metrics['word_progression']['final']
        change = ((final_words - draft_words) / draft_words) * 100
        footer_metrics['word_progression']['change_percentage'] = round(change, 1)

    # Simplify origin distribution for footer
    for version, data in origin_dist.items():
        if version != 'new_in_final':
            footer_metrics['content_retention']['content_origins'][version] = round(data['percentage'], 1)

    # Simplify modification distribution
    for category, data in modification_dist.items():
        clean_category = category.replace('_', ' ').title()
        footer_metrics['modification_summary'][clean_category] = round(data['percentage'], 1)

    return footer_metrics

print("🎯 Execution functions loaded. Ready to run complete analysis!")
print("\nTo run the complete analysis:")
print("combined_results, footer_metrics = run_steps_3_4(article_versions, preprocessor, 'your_output_path')")

🎯 Execution functions loaded. Ready to run complete analysis!

To run the complete analysis:
combined_results, footer_metrics = run_steps_3_4(article_versions, preprocessor, 'your_output_path')


In [36]:
# =============================================================================
# CELL 5: QUICK EXECUTION FOR EXISTING DATA
# =============================================================================

def run_complete_analysis_from_existing(article_versions, preprocessor):
    """Run steps 3-4 using the output path from existing data"""
    # Find where the Step 1-2 checkpoint was actually saved
    base_path = article_versions.input_path

    # Check for the nested output structure that was created in Steps 1-2
    nested_output_path = os.path.join(base_path, 'output', 'output')
    regular_output_path = os.path.join(base_path, 'output')

    # Use the path where the checkpoint file exists
    checkpoint_file = f"markup-languages_checkpoint_steps_1_2.json"

    if os.path.exists(os.path.join(nested_output_path, checkpoint_file)):
        output_path = nested_output_path
        print(f"📂 Using nested output path: {output_path}")
    elif os.path.exists(os.path.join(regular_output_path, checkpoint_file)):
        output_path = regular_output_path
        print(f"📂 Using regular output path: {output_path}")
    else:
        # Create regular output path as fallback
        output_path = regular_output_path
        os.makedirs(output_path, exist_ok=True)
        print(f"📂 Created output path: {output_path}")

    return run_steps_3_4(article_versions, preprocessor, output_path)

print("⚡ Quick execution function available:")
print("combined_results, footer_metrics = run_complete_analysis_from_existing(article_versions, preprocessor)")

⚡ Quick execution function available:
combined_results, footer_metrics = run_complete_analysis_from_existing(article_versions, preprocessor)


In [37]:
combined_results, footer_metrics = run_complete_analysis_from_existing(article_versions, preprocessor)

📂 Using nested output path: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human/output/output
🚀 Starting Steps 3-4 for article: markup-languages

🔍 Step 3: Similarity Analysis
  🔍 Analyzing draft → refined
    Analyzing 47 vs 60 sentences...
  🔍 Analyzing refined → edited
    Analyzing 60 vs 45 sentences...
  🔍 Analyzing edited → final
    Analyzing 45 vs 42 sentences...
  🔍 Analyzing draft → final (overall change)
  🔍 Analyzing draft → final
    Analyzing 47 vs 42 sentences...

📍 Step 4: Attribution Mapping
  📝 Tracing 42 final sentences...

📊 Attribution Summary:
  Total sentences in final: 42

  Origin Distribution:
    edited: 36 sentences (85.7%)
    draft: 1 sentences (2.4%)
    new_in_final: 5 sentences (11.9%)

  Modification Levels:
    high_similarity: 3 sentences (7.1%)
    medium_similarity: 17 sentences (40.5%)
    low_similarity: 17 sentences (40.5%)
    new_content: 5 sentences (11.9%)

💾 Complete analysis saved: /content/drive/MyDrive/Google Dri

In [56]:
# =============================================================================
# CELL 1: TREND ANALYSIS SETUP
# =============================================================================

import os
import json
import re
import glob
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from collections import defaultdict, Counter

# Archive configuration
ARCHIVE_BASE_PATH = '/content/drive/MyDrive/Google Drive/syntaxandempathy/99-past/'

class TrendAnalyzer:
    """Class to analyze trends across multiple articles over time periods"""

    def __init__(self, archive_path=ARCHIVE_BASE_PATH):
        self.archive_path = archive_path
        self.found_articles = []
        self.period_data = {}
        self.trend_results = {}

    def parse_period(self, period):
        """Parse period string into date patterns"""
        period_patterns = []

        if len(period) == 4:  # Year: "2025"
            # All months in the year: 202501*, 202502*, etc.
            for month in range(1, 13):
                pattern = f"{period}{month:02d}*"
                period_patterns.append(pattern)

        elif len(period) == 7 and period[4] == '-':  # Month: "2025-01"
            year, month = period.split('-')
            pattern = f"{year}{month}*"  # e.g., "202501*"
            period_patterns.append(pattern)

        elif period.endswith(('Q1', 'Q2', 'Q3', 'Q4')):  # Quarter: "2025-Q1"
            year, quarter = period.split('-')
            quarter_months = {
                'Q1': ['01', '02', '03'],
                'Q2': ['04', '05', '06'],
                'Q3': ['07', '08', '09'],
                'Q4': ['10', '11', '12']
            }

            for month in quarter_months[quarter]:
                pattern = f"{year}{month}*"  # e.g., "202501*", "202502*", "202503*"
                period_patterns.append(pattern)
        else:
            raise ValueError(f"Invalid period format: {period}. Use 'YYYY', 'YYYY-MM', or 'YYYY-Q#'")

        return period_patterns

    def find_articles_for_period(self, period):
        """Find all articles matching the period"""
        print(f"🔍 Searching for articles in period: {period}")
        print(f"📂 Archive path: {self.archive_path}")

        if not os.path.exists(self.archive_path):
            print(f"❌ Archive path does not exist: {self.archive_path}")
            return []

        patterns = self.parse_period(period)
        found_articles = []

        # Get all directories in archive
        all_dirs = [d for d in os.listdir(self.archive_path)
                   if os.path.isdir(os.path.join(self.archive_path, d))]

        print(f"📁 Found {len(all_dirs)} directories in archive")

        # Match directories against patterns
        for pattern in patterns:
            # Convert shell pattern to regex: 202506* becomes ^202506.*
            pattern_regex = f"^{pattern.replace('*', '.*')}"
            regex = re.compile(pattern_regex)

            for dir_name in all_dirs:
                if regex.match(dir_name):
                    article_path = os.path.join(self.archive_path, dir_name)

                    # Look for analysis file
                    analysis_files = glob.glob(os.path.join(article_path, 'output', '*_complete_analysis.json'))

                    if analysis_files:
                        # Avoid duplicates
                        if not any(item['folder_name'] == dir_name for item in found_articles):
                            found_articles.append({
                                'folder_name': dir_name,
                                'article_path': article_path,
                                'analysis_file': analysis_files[0],
                                'date_prefix': dir_name[:8],  # Extract YYYYMMDD
                                'article_name': dir_name[9:]  # Extract name after date-
                            })
                            print(f"  ✓ Found: {dir_name} (matches {pattern})")
                    else:
                        print(f"  ⚠ No analysis file found in: {dir_name}")

        found_articles.sort(key=lambda x: x['date_prefix'])  # Sort by date
        self.found_articles = found_articles

        print(f"📊 Total articles found for {period}: {len(found_articles)}")
        return found_articles

    def load_article_data(self, article_info):
        """Load analysis data for a single article"""
        try:
            with open(article_info['analysis_file'], 'r', encoding='utf-8') as f:
                data = json.load(f)

            # Add metadata
            data['folder_name'] = article_info['folder_name']
            data['publication_date'] = article_info['date_prefix']

            return data

        except Exception as e:
            print(f"❌ Error loading {article_info['analysis_file']}: {str(e)}")
            return None

    def aggregate_attribution_trends(self, articles_data):
        """Aggregate attribution data across articles"""
        attribution_trends = {
            'by_article': [],
            'averages': {},
            'totals': {},
            'trends_over_time': []
        }

        # Collect data for each article
        for article in articles_data:
            if not article or 'attribution_analysis' not in article:
                continue

            attr_stats = article['attribution_analysis']['statistics']
            origin_dist = attr_stats['origin_distribution']
            mod_dist = attr_stats['modification_distribution']

            article_data = {
                'article_name': article['article_name'],
                'publication_date': article['publication_date'],
                'total_sentences': attr_stats['total_sentences'],
                'origin_percentages': {},
                'modification_percentages': {}
            }

            # Extract origin percentages
            for version, data in origin_dist.items():
                article_data['origin_percentages'][version] = data['percentage']

            # Extract modification percentages
            for category, data in mod_dist.items():
                article_data['modification_percentages'][category] = data['percentage']

            attribution_trends['by_article'].append(article_data)

        # Calculate averages across all articles
        if attribution_trends['by_article']:
            all_origins = set()
            all_modifications = set()

            for article in attribution_trends['by_article']:
                all_origins.update(article['origin_percentages'].keys())
                all_modifications.update(article['modification_percentages'].keys())

            # Average origin percentages
            for origin in all_origins:
                values = [a['origin_percentages'].get(origin, 0) for a in attribution_trends['by_article']]
                attribution_trends['averages'][f'origin_{origin}'] = np.mean(values)

            # Average modification percentages
            for mod in all_modifications:
                values = [a['modification_percentages'].get(mod, 0) for a in attribution_trends['by_article']]
                attribution_trends['averages'][f'modification_{mod}'] = np.mean(values)

        return attribution_trends

    def aggregate_similarity_trends(self, articles_data):
        """Aggregate similarity data across articles"""
        similarity_trends = {
            'draft_to_final': [],
            'sequential_changes': [],
            'averages': {}
        }

        for article in articles_data:
            if not article or 'similarity_analysis' not in article:
                continue

            sim_analysis = article['similarity_analysis']

            # Draft to final similarity
            if sim_analysis['draft_to_final']:
                draft_final = sim_analysis['draft_to_final']['full_text']
                similarity_trends['draft_to_final'].append({
                    'article_name': article['article_name'],
                    'publication_date': article['publication_date'],
                    'lexical_similarity': draft_final['lexical']['lexical_average'],
                    'semantic_similarity': draft_final['semantic']['semantic_similarity'],
                    'combined_similarity': draft_final['combined']
                })

            # Sequential changes
            for seq in sim_analysis['sequential_analysis']:
                similarity_trends['sequential_changes'].append({
                    'article_name': article['article_name'],
                    'publication_date': article['publication_date'],
                    'version_pair': seq['version_pair'],
                    'combined_similarity': seq['full_text']['combined']
                })

        # Calculate averages
        if similarity_trends['draft_to_final']:
            df_sims = similarity_trends['draft_to_final']
            similarity_trends['averages']['draft_to_final'] = {
                'lexical': np.mean([s['lexical_similarity'] for s in df_sims]),
                'semantic': np.mean([s['semantic_similarity'] for s in df_sims]),
                'combined': np.mean([s['combined_similarity'] for s in df_sims])
            }

        return similarity_trends

    def aggregate_word_count_trends(self, articles_data):
        """Aggregate word count progression across articles"""
        word_trends = {
            'by_article': [],
            'averages': {},
            'progression_patterns': []
        }

        for article in articles_data:
            if not article or 'processing_summary' not in article:
                continue

            processing = article['processing_summary']

            article_words = {
                'article_name': article['article_name'],
                'publication_date': article['publication_date'],
                'word_counts': {},
                'progression': []
            }

            # Extract word counts for each version
            version_order = ['draft', 'refined', 'edited', 'final']
            for version in version_order:
                if version in processing:
                    count = processing[version]['word_count']
                    article_words['word_counts'][version] = count
                    article_words['progression'].append(count)

            word_trends['by_article'].append(article_words)

        # Calculate average progressions
        if word_trends['by_article']:
            version_order = ['draft', 'refined', 'edited', 'final']
            for version in version_order:
                counts = [a['word_counts'].get(version, 0) for a in word_trends['by_article'] if version in a['word_counts']]
                if counts:
                    word_trends['averages'][version] = np.mean(counts)

        return word_trends

    def analyze_trends(self, period):
        """Perform complete trend analysis for a period"""
        print(f"\n🚀 Starting trend analysis for period: {period}")

        # Find articles
        articles = self.find_articles_for_period(period)

        if not articles:
            print(f"❌ No articles found for period {period}")
            return None

        # Load article data
        print(f"\n📖 Loading analysis data for {len(articles)} articles...")
        articles_data = []

        for article_info in articles:
            data = self.load_article_data(article_info)
            if data:
                articles_data.append(data)
                print(f"  ✓ Loaded: {article_info['article_name']}")
            else:
                print(f"  ✗ Failed: {article_info['article_name']}")

        print(f"📊 Successfully loaded {len(articles_data)} articles")

        # Perform trend analysis
        print(f"\n🔍 Analyzing trends...")

        attribution_trends = self.aggregate_attribution_trends(articles_data)
        similarity_trends = self.aggregate_similarity_trends(articles_data)
        word_count_trends = self.aggregate_word_count_trends(articles_data)

        # Compile results
        self.trend_results = {
            'period': period,
            'analysis_timestamp': datetime.now().isoformat(),
            'articles_analyzed': len(articles_data),
            'article_list': [{'name': a['article_name'], 'date': a['publication_date']} for a in articles_data],
            'attribution_trends': attribution_trends,
            'similarity_trends': similarity_trends,
            'word_count_trends': word_count_trends,
            'summary': self.generate_trend_summary(attribution_trends, similarity_trends, word_count_trends)
        }

        return self.trend_results

    def generate_trend_summary(self, attribution_trends, similarity_trends, word_count_trends):
        """Generate a summary of key trends"""
        summary = {
            'key_metrics': {},
            'patterns': [],
            'insights': []
        }

        # Key attribution metrics
        if attribution_trends['averages']:
            avg_draft = attribution_trends['averages'].get('origin_draft', 0)
            avg_edited = attribution_trends['averages'].get('origin_edited', 0)
            avg_new = attribution_trends['averages'].get('origin_new_in_final', 0)

            summary['key_metrics']['avg_draft_retention'] = round(avg_draft, 1)
            summary['key_metrics']['avg_edited_dominance'] = round(avg_edited, 1)
            summary['key_metrics']['avg_new_content'] = round(avg_new, 1)

        # Key similarity metrics
        if similarity_trends['averages'].get('draft_to_final'):
            df_sim = similarity_trends['averages']['draft_to_final']
            summary['key_metrics']['avg_draft_final_similarity'] = round(df_sim['combined'] * 100, 1)

        # Word count patterns
        if word_count_trends['averages']:
            avg_draft = word_count_trends['averages'].get('draft', 0)
            avg_final = word_count_trends['averages'].get('final', 0)
            if avg_draft > 0:
                change_pct = ((avg_final - avg_draft) / avg_draft) * 100
                summary['key_metrics']['avg_word_change_pct'] = round(change_pct, 1)

        return summary

    def save_trend_analysis(self, output_path, filename_prefix="trend_analysis"):
        """Save trend analysis results"""
        if not self.trend_results:
            print("❌ No trend results to save")
            return None

        # Create output directory
        os.makedirs(output_path, exist_ok=True)

        # Save comprehensive results
        results_file = os.path.join(output_path, f"{filename_prefix}_{self.trend_results['period']}.json")

        with open(results_file, 'w', encoding='utf-8') as f:
            json.dump(self.trend_results, f, indent=2, ensure_ascii=False)

        print(f"💾 Trend analysis saved: {results_file}")

        # Also save a summary report
        summary_file = os.path.join(output_path, f"{filename_prefix}_{self.trend_results['period']}_summary.json")

        summary_data = {
            'period': self.trend_results['period'],
            'articles_count': self.trend_results['articles_analyzed'],
            'key_metrics': self.trend_results['summary']['key_metrics'],
            'article_list': self.trend_results['article_list']
        }

        with open(summary_file, 'w', encoding='utf-8') as f:
            json.dump(summary_data, f, indent=2, ensure_ascii=False)

        print(f"📋 Summary report saved: {summary_file}")

        return results_file, summary_file

print("📈 TrendAnalyzer class loaded. Ready for trend analysis!")

📈 TrendAnalyzer class loaded. Ready for trend analysis!


In [58]:
# =============================================================================
# CELL 2: EXECUTION FUNCTIONS
# =============================================================================

def analyze_period_trends(period, output_path=None):
    """Analyze trends for a specific period"""
    print(f"📈 Analyzing trends for period: {period}")

    # Create analyzer
    analyzer = TrendAnalyzer()

    # Run analysis
    results = analyzer.analyze_trends(period)

    if not results:
        return None

    # Print summary
    print(f"\n📊 TREND ANALYSIS SUMMARY for {period}")
    print(f"{'='*50}")

    summary = results['summary']['key_metrics']

    if 'avg_draft_retention' in summary:
        print(f"📝 Average content retention from draft: {summary['avg_draft_retention']}%")

    if 'avg_edited_dominance' in summary:
        print(f"✏️  Average content from editing phase: {summary['avg_edited_dominance']}%")

    if 'avg_new_content' in summary:
        print(f"🆕 Average new content in final: {summary['avg_new_content']}%")

    if 'avg_draft_final_similarity' in summary:
        print(f"🔄 Average draft-to-final similarity: {summary['avg_draft_final_similarity']}%")

    if 'avg_word_change_pct' in summary:
        change = summary['avg_word_change_pct']
        direction = "increase" if change > 0 else "decrease"
        print(f"📏 Average word count {direction}: {abs(change):.1f}%")

    print(f"\n📚 Articles analyzed: {results['articles_analyzed']}")
    for article in results['article_list']:
        print(f"  • {article['date']}: {article['name']}")

    # Save results if output path provided
    if output_path:
        analyzer.save_trend_analysis(output_path)

    return results, analyzer

def quick_trend_check(period):
    """Quick trend check without saving files"""
    results, analyzer = analyze_period_trends(period)
    return results

print("🎯 Execution functions loaded!")
print("\nUsage examples:")
print("# Quick check (no files saved)")
print("results = quick_trend_check('2025-Q1')")
print("\n# Full analysis with file output")
print("results, analyzer = analyze_period_trends('2025-01', '/your/output/path')")
print("\n# Supported period formats:")
print("  • '2025' (full year)")
print("  • '2025-01' (specific month)")
print("  • '2025-Q1' (quarter)")

🎯 Execution functions loaded!

Usage examples:
# Quick check (no files saved)
results = quick_trend_check('2025-Q1')

# Full analysis with file output
results, analyzer = analyze_period_trends('2025-01', '/your/output/path')

# Supported period formats:
  • '2025' (full year)
  • '2025-01' (specific month)
  • '2025-Q1' (quarter)


In [59]:
# =============================================================================
# CELL 3: TESTING AND VALIDATION
# =============================================================================

def test_archive_structure():
    """Test the archive structure and show what's available"""
    print(f"🔍 Testing archive structure...")
    print(f"📂 Archive path: {ARCHIVE_BASE_PATH}")

    if not os.path.exists(ARCHIVE_BASE_PATH):
        print(f"❌ Archive path does not exist!")
        return False

    # List all directories
    all_dirs = [d for d in os.listdir(ARCHIVE_BASE_PATH)
               if os.path.isdir(os.path.join(ARCHIVE_BASE_PATH, d))]

    print(f"📁 Found {len(all_dirs)} directories:")

    # Look for date pattern: YYYYMMDD-*
    date_pattern = re.compile(r'^\d{8}-.*')
    valid_dirs = []

    for dir_name in sorted(all_dirs):
        print(f"  🔍 Checking: '{dir_name}'")

        if date_pattern.match(dir_name):
            # Check for analysis file
            analysis_files = glob.glob(os.path.join(ARCHIVE_BASE_PATH, dir_name, 'output', '*_complete_analysis.json'))
            status = "✓" if analysis_files else "⚠"
            print(f"    {status} MATCHES date pattern (YYYYMMDD-name)")
            if analysis_files:
                valid_dirs.append(dir_name)
                print(f"      Found analysis file: {os.path.basename(analysis_files[0])}")
            else:
                print(f"      No analysis file in output/ folder")
        else:
            print(f"    ✗ Does NOT match YYYYMMDD-name pattern")

    print(f"\n📊 Summary:")
    print(f"  Total directories: {len(all_dirs)}")
    print(f"  Valid date format: {len([d for d in all_dirs if date_pattern.match(d)])}")
    print(f"  With analysis files: {len(valid_dirs)}")

    if valid_dirs:
        print(f"\n🗓️  Date range:")
        dates = [d[:8] for d in valid_dirs]
        print(f"  Earliest: {min(dates)}")
        print(f"  Latest: {max(dates)}")

        # Show available periods
        years = set(d[:4] for d in dates)
        print(f"\n📅 Available periods you can analyze:")
        for year in sorted(years):
            year_dates = [d for d in dates if d.startswith(year)]
            months = set(d[4:6] for d in year_dates)
            quarters = set(f"Q{(int(d[4:6])-1)//3 + 1}" for d in year_dates)
            print(f"  {year}: {len(year_dates)} articles")
            print(f"    Months: {', '.join(sorted(months))}")
            print(f"    Quarters: {', '.join(sorted(quarters))}")

    return len(valid_dirs) > 0

print("🧪 Testing functions loaded!")
print("Run: test_archive_structure() to validate your archive")

🧪 Testing functions loaded!
Run: test_archive_structure() to validate your archive


In [60]:
test_archive_structure()

🔍 Testing archive structure...
📂 Archive path: /content/drive/MyDrive/Google Drive/syntaxandempathy/99-past/
📁 Found 2 directories:
  🔍 Checking: '2024'
    ✗ Does NOT match YYYYMMDD-name pattern
  🔍 Checking: '20250614-ai-vs-human'
    ✓ MATCHES date pattern (YYYYMMDD-name)
      Found analysis file: markup-languages_complete_analysis.json

📊 Summary:
  Total directories: 2
  Valid date format: 1
  With analysis files: 1

🗓️  Date range:
  Earliest: 20250614
  Latest: 20250614

📅 Available periods you can analyze:
  2025: 1 articles
    Months: 06
    Quarters: Q2


True

In [61]:
# Test with the available periods
results = quick_trend_check('2025-06')

📈 Analyzing trends for period: 2025-06

🚀 Starting trend analysis for period: 2025-06
🔍 Searching for articles in period: 2025-06
📂 Archive path: /content/drive/MyDrive/Google Drive/syntaxandempathy/99-past/
📁 Found 2 directories in archive
  ✓ Found: 20250614-ai-vs-human (matches 202506*)
📊 Total articles found for 2025-06: 1

📖 Loading analysis data for 1 articles...
  ✓ Loaded: ai-vs-human
📊 Successfully loaded 1 articles

🔍 Analyzing trends...

📊 TREND ANALYSIS SUMMARY for 2025-06
📝 Average content retention from draft: 2.4%
✏️  Average content from editing phase: 85.7%
🆕 Average new content in final: 11.9%
🔄 Average draft-to-final similarity: 43.1%
📏 Average word count increase: 3.4%

📚 Articles analyzed: 1
  • 20250614: markup-languages


In [68]:
# =============================================================================
# CELL 1: VISUALIZATION SETUP AND DEPENDENCIES
# =============================================================================

# Install required packages
!pip install matplotlib seaborn plotly pandas numpy -q

import matplotlib.pyplot as plt
import matplotlib.patches as patches
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.io as pio
import pandas as pd
import numpy as np
import json
import os
from datetime import datetime
from collections import defaultdict

# Set up plotting styles
plt.style.use('default')
sns.set_palette("husl")
pio.templates.default = "plotly_white"

# Custom color palette for consistency
COLORS = {
    'draft': '#FF6B6B',      # Red
    'refined': '#4ECDC4',    # Teal
    'edited': '#45B7D1',     # Blue
    'final': '#96CEB4',      # Green
    'new_content': '#FECA57', # Yellow
    'high_similarity': '#48CAE4',    # Light blue
    'medium_similarity': '#FFB3BA',  # Light pink
    'low_similarity': '#FFDFBA',     # Light orange
    'background': '#F8F9FA'
}

print("📊 Visualization dependencies loaded successfully!")
print("🎨 Custom color palette configured")

class VisualizationEngine:
    """Unified visualization engine for individual articles and trend analysis"""

    def __init__(self, output_path=None):
        self.output_path = output_path
        self.figures = {}

    def create_content_flow_chart(self, data, title="Content Flow Analysis", mode="individual"):
        """Create a content flow visualization showing version progression"""

        if mode == "individual":
            # Individual article - show attribution percentages
            attribution_stats = data['attribution_analysis']['statistics']
            origin_dist = attribution_stats['origin_distribution']

            # Prepare data for flow chart
            flow_data = []
            for version, stats in origin_dist.items():
                if version != 'new_in_final':
                    flow_data.append({
                        'source': version.title(),
                        'target': 'Final Article',
                        'value': stats['percentage'],
                        'count': stats['count']
                    })

            # Add new content
            if 'new_in_final' in origin_dist:
                flow_data.append({
                    'source': 'New Content',
                    'target': 'Final Article',
                    'value': origin_dist['new_in_final']['percentage'],
                    'count': origin_dist['new_in_final']['count']
                })

        else:
            # Trend analysis - show average percentages
            avg_data = data['attribution_trends']['averages']
            flow_data = []

            for key, value in avg_data.items():
                if key.startswith('origin_') and not key.endswith('new_in_final'):
                    version = key.replace('origin_', '').title()
                    flow_data.append({
                        'source': version,
                        'target': 'Final Articles',
                        'value': value,
                        'count': f"{value:.1f}% avg"
                    })

            # Add new content average
            if 'origin_new_in_final' in avg_data:
                flow_data.append({
                    'source': 'New Content',
                    'target': 'Final Articles',
                    'value': avg_data['origin_new_in_final'],
                    'count': f"{avg_data['origin_new_in_final']:.1f}% avg"
                })

        # Create sankey-style visualization using matplotlib
        fig, ax = plt.subplots(figsize=(12, 8))
        ax.set_xlim(0, 10)
        ax.set_ylim(0, 10)

        # Draw flow connections
        y_positions = np.linspace(8, 2, len(flow_data))

        for i, item in enumerate(flow_data):
            # Source box
            source_color = COLORS.get(item['source'].lower(), '#CCCCCC')
            source_rect = patches.Rectangle((0.5, y_positions[i]-0.3), 2, 0.6,
                                          facecolor=source_color, alpha=0.7, edgecolor='black')
            ax.add_patch(source_rect)
            ax.text(1.5, y_positions[i], item['source'], ha='center', va='center', fontweight='bold')

            # Flow arrow
            arrow_width = item['value'] / 100 * 0.4  # Scale arrow width by percentage
            arrow = patches.FancyArrowPatch((2.5, y_positions[i]), (6.5, 5),
                                          arrowstyle='->', mutation_scale=20,
                                          linewidth=arrow_width*10, alpha=0.6,
                                          color=source_color)
            ax.add_patch(arrow)

            # Percentage label
            ax.text(4.5, y_positions[i]+0.2, f"{item['value']:.1f}%",
                   ha='center', va='center', fontsize=10, fontweight='bold')

        # Target box
        target_rect = patches.Rectangle((7, 4.5), 2, 1,
                                      facecolor=COLORS['final'], alpha=0.7, edgecolor='black')
        ax.add_patch(target_rect)
        target_text = 'Final Article' if mode == 'individual' else 'Final Articles'
        ax.text(8, 5, target_text, ha='center', va='center', fontweight='bold')

        ax.set_title(title, fontsize=16, fontweight='bold', pad=20)
        ax.axis('off')

        return fig

    def create_modification_intensity_chart(self, data, title="Content Modification Intensity", mode="individual"):
        """Create a chart showing modification intensity levels"""

        if mode == "individual":
            mod_dist = data['attribution_analysis']['statistics']['modification_distribution']
            categories = list(mod_dist.keys())
            values = [mod_dist[cat]['percentage'] for cat in categories]
        else:
            # Trend analysis
            avg_data = data['attribution_trends']['averages']
            categories = []
            values = []
            for key, value in avg_data.items():
                if key.startswith('modification_'):
                    cat = key.replace('modification_', '').replace('_', ' ').title()
                    categories.append(cat)
                    values.append(value)

        # Create horizontal bar chart
        fig, ax = plt.subplots(figsize=(10, 6))

        # Color mapping for modification levels
        mod_colors = [COLORS.get(cat.lower().replace(' ', '_'), '#CCCCCC') for cat in categories]

        bars = ax.barh(categories, values, color=mod_colors, alpha=0.8, edgecolor='black', linewidth=1)

        # Add percentage labels on bars
        for bar, value in zip(bars, values):
            width = bar.get_width()
            ax.text(width + 1, bar.get_y() + bar.get_height()/2,
                   f'{value:.1f}%', ha='left', va='center', fontweight='bold')

        ax.set_xlabel('Percentage of Content', fontsize=12, fontweight='bold')
        ax.set_title(title, fontsize=14, fontweight='bold', pad=20)
        ax.set_xlim(0, max(values) * 1.2)

        # Add grid for readability
        ax.grid(axis='x', alpha=0.3, linestyle='--')
        ax.set_axisbelow(True)

        return fig

    def create_word_count_progression(self, data, title="Word Count Progression", mode="individual"):
        """Create a line chart showing word count changes across versions"""

        if mode == "individual":
            processing = data['processing_summary']
            versions = ['draft', 'refined', 'edited', 'final']
            word_counts = []
            version_labels = []

            for version in versions:
                if version in processing:
                    word_counts.append(processing[version]['word_count'])
                    version_labels.append(version.title())
        else:
            # Trend analysis - show averages
            avg_data = data['word_count_trends']['averages']
            versions = ['draft', 'refined', 'edited', 'final']
            word_counts = []
            version_labels = []

            for version in versions:
                if version in avg_data:
                    word_counts.append(avg_data[version])
                    version_labels.append(version.title())

        # Create line chart
        fig, ax = plt.subplots(figsize=(10, 6))

        line = ax.plot(version_labels, word_counts, marker='o', linewidth=3,
                      markersize=8, color=COLORS['draft'], markerfacecolor=COLORS['final'])

        # Add value labels on points
        for i, (label, count) in enumerate(zip(version_labels, word_counts)):
            ax.annotate(f'{int(count)}', (i, count), textcoords="offset points",
                       xytext=(0,10), ha='center', fontweight='bold')

        ax.set_ylabel('Word Count', fontsize=12, fontweight='bold')
        ax.set_title(title, fontsize=14, fontweight='bold', pad=20)
        ax.grid(True, alpha=0.3, linestyle='--')
        ax.set_axisbelow(True)

        # Calculate and show percentage change
        if len(word_counts) >= 2:
            change = ((word_counts[-1] - word_counts[0]) / word_counts[0]) * 100
            change_text = f"Overall change: {change:+.1f}%"
            ax.text(0.02, 0.98, change_text, transform=ax.transAxes,
                   fontsize=11, fontweight='bold', verticalalignment='top',
                   bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

        return fig

    def create_similarity_heatmap(self, data, title="Version Similarity Matrix", mode="individual"):
        """Create a heatmap showing similarities between versions"""

        if mode == "individual":
            similarity_data = data['similarity_analysis']['sequential_analysis']

            # Build similarity matrix
            versions = ['Draft', 'Refined', 'Edited', 'Final']
            matrix = np.zeros((len(versions), len(versions)))
            np.fill_diagonal(matrix, 1.0)  # Perfect similarity with self

            # Fill in the sequential similarities
            for seq in similarity_data:
                pair = seq['version_pair']
                similarity = seq['full_text']['combined']

                # Parse version pair (e.g., "draft_to_refined")
                from_version, to_version = pair.split('_to_')
                from_idx = versions.index(from_version.title())
                to_idx = versions.index(to_version.title())

                matrix[from_idx][to_idx] = similarity
                matrix[to_idx][from_idx] = similarity  # Make symmetric

        else:
            # For trend analysis, show average similarities
            similarity_trends = data['similarity_trends']

            # Create a simplified matrix for trends
            versions = ['Draft', 'Final']
            matrix = np.array([[1.0, 0.0], [0.0, 1.0]])

            if similarity_trends['averages'].get('draft_to_final'):
                avg_sim = similarity_trends['averages']['draft_to_final']['combined']
                matrix[0][1] = avg_sim
                matrix[1][0] = avg_sim

        # Create heatmap
        fig, ax = plt.subplots(figsize=(8, 6))

        im = ax.imshow(matrix, cmap='RdYlGn', aspect='equal', vmin=0, vmax=1)

        # Add text annotations
        for i in range(len(versions)):
            for j in range(len(versions)):
                text = ax.text(j, i, f'{matrix[i, j]:.2f}',
                             ha="center", va="center", color="black", fontweight='bold')

        ax.set_xticks(range(len(versions)))
        ax.set_yticks(range(len(versions)))
        ax.set_xticklabels(versions)
        ax.set_yticklabels(versions)
        ax.set_title(title, fontsize=14, fontweight='bold', pad=20)

        # Add colorbar
        cbar = plt.colorbar(im, ax=ax, shrink=0.8)
        cbar.set_label('Similarity Score', rotation=270, labelpad=20, fontweight='bold')

        return fig

    def create_summary_dashboard(self, data, title_prefix="Article Analysis", mode="individual"):
        """Create a comprehensive dashboard with multiple visualizations"""

        # Create subplots
        fig = plt.figure(figsize=(16, 12))

        # Title
        main_title = f"{title_prefix} Dashboard"
        if mode == "individual":
            main_title += f" - {data['article_name']}"

        fig.suptitle(main_title, fontsize=18, fontweight='bold', y=0.95)

        # Create individual charts and save them
        charts = {}

        # Content Flow Chart
        charts['flow'] = self.create_content_flow_chart(data, "Content Attribution", mode)

        # Modification Intensity
        charts['modification'] = self.create_modification_intensity_chart(data, "Modification Intensity", mode)

        # Word Count Progression
        charts['word_count'] = self.create_word_count_progression(data, "Word Count Evolution", mode)

        # Similarity Heatmap
        charts['similarity'] = self.create_similarity_heatmap(data, "Version Similarities", mode)

        return charts

    def save_visualizations(self, charts, prefix="analysis", formats=['png', 'svg']):
        """Save all visualizations in multiple formats"""

        if not self.output_path:
            print("⚠️ No output path specified - visualizations not saved")
            return {}

        os.makedirs(self.output_path, exist_ok=True)
        saved_files = {}

        for chart_name, fig in charts.items():
            for fmt in formats:
                filename = f"{prefix}_{chart_name}.{fmt}"
                filepath = os.path.join(self.output_path, filename)

                fig.savefig(filepath, dpi=300, bbox_inches='tight',
                           facecolor='white', edgecolor='none')

                if chart_name not in saved_files:
                    saved_files[chart_name] = []
                saved_files[chart_name].append(filepath)

        # Close figures to free memory
        for fig in charts.values():
            plt.close(fig)

        print(f"💾 Visualizations saved to: {self.output_path}")
        for chart_name, files in saved_files.items():
            print(f"  📊 {chart_name}: {len(files)} formats")

        return saved_files

print("🎨 VisualizationEngine class loaded successfully!")

📊 Visualization dependencies loaded successfully!
🎨 Custom color palette configured
🎨 VisualizationEngine class loaded successfully!


In [69]:
# =============================================================================
# CELL 2: INTERACTIVE VISUALIZATION FUNCTIONS
# =============================================================================

def create_interactive_flow_chart(data, mode="individual"):
    """Create an interactive Plotly flow/sankey diagram"""

    if mode == "individual":
        attribution_stats = data['attribution_analysis']['statistics']
        origin_dist = attribution_stats['origin_distribution']

        # Prepare data for Sankey diagram
        sources = []
        targets = []
        values = []
        labels = []

        # Add all source versions
        for version in ['draft', 'refined', 'edited']:
            if version in origin_dist:
                labels.append(version.title())

        # Add new content and final
        if 'new_in_final' in origin_dist:
            labels.append('New Content')
        labels.append('Final Article')

        # Create connections
        final_idx = len(labels) - 1

        for version, stats in origin_dist.items():
            if version != 'new_in_final':
                source_idx = labels.index(version.title())
                sources.append(source_idx)
                targets.append(final_idx)
                values.append(stats['percentage'])

        # Add new content if exists
        if 'new_in_final' in origin_dist:
            new_idx = labels.index('New Content')
            sources.append(new_idx)
            targets.append(final_idx)
            values.append(origin_dist['new_in_final']['percentage'])

    else:
        # Trend analysis mode
        avg_data = data['attribution_trends']['averages']
        labels = []
        sources = []
        targets = []
        values = []

        # Build labels and connections for trend data
        for key, value in avg_data.items():
            if key.startswith('origin_') and value > 0:
                version = key.replace('origin_', '')
                if version != 'new_in_final':
                    labels.append(version.title())
                else:
                    labels.append('New Content')

        labels.append('Final Articles')
        final_idx = len(labels) - 1

        # Create connections
        i = 0
        for key, value in avg_data.items():
            if key.startswith('origin_') and value > 0:
                sources.append(i)
                targets.append(final_idx)
                values.append(value)
                i += 1

    # Create Sankey diagram
    fig = go.Figure(data=[go.Sankey(
        node=dict(
            pad=15,
            thickness=20,
            line=dict(color="black", width=0.5),
            label=labels,
            color=[COLORS.get(label.lower().replace(' ', '_'), '#CCCCCC') for label in labels]
        ),
        link=dict(
            source=sources,
            target=targets,
            value=values,
            color=['rgba(255,107,107,0.4)' if i < len(sources) else 'rgba(150,206,180,0.4)'
                   for i in range(len(sources))]
        )
    )])

    title = "Content Flow Analysis"
    if mode == "individual":
        title += f" - {data['article_name']}"

    fig.update_layout(
        title_text=title,
        font_size=12,
        height=600
    )

    return fig

def create_interactive_trends_chart(trend_data):
    """Create interactive trend charts for multiple articles over time"""

    # Extract data for trends over time
    attribution_by_article = trend_data['attribution_trends']['by_article']

    if not attribution_by_article:
        print("No trend data available")
        return None

    # Prepare data for plotting
    dates = []
    draft_retention = []
    edited_dominance = []
    new_content = []
    article_names = []

    for article in attribution_by_article:
        dates.append(article['publication_date'])
        article_names.append(article['article_name'])

        # Extract percentages
        draft_retention.append(article['origin_percentages'].get('draft', 0))
        edited_dominance.append(article['origin_percentages'].get('edited', 0))
        new_content.append(article['origin_percentages'].get('new_in_final', 0))

    # Create multi-line chart
    fig = go.Figure()

    # Add traces for each metric
    fig.add_trace(go.Scatter(
        x=dates, y=draft_retention,
        mode='lines+markers',
        name='Draft Retention %',
        line=dict(color=COLORS['draft'], width=3),
        marker=dict(size=8),
        hovertemplate='<b>%{text}</b><br>Date: %{x}<br>Draft Retention: %{y:.1f}%<extra></extra>',
        text=article_names
    ))

    fig.add_trace(go.Scatter(
        x=dates, y=edited_dominance,
        mode='lines+markers',
        name='Edited Content %',
        line=dict(color=COLORS['edited'], width=3),
        marker=dict(size=8),
        hovertemplate='<b>%{text}</b><br>Date: %{x}<br>Edited Content: %{y:.1f}%<extra></extra>',
        text=article_names
    ))

    fig.add_trace(go.Scatter(
        x=dates, y=new_content,
        mode='lines+markers',
        name='New Content %',
        line=dict(color=COLORS['new_content'], width=3),
        marker=dict(size=8),
        hovertemplate='<b>%{text}</b><br>Date: %{x}<br>New Content: %{y:.1f}%<extra></extra>',
        text=article_names
    ))

    fig.update_layout(
        title='Content Attribution Trends Over Time',
        xaxis_title='Publication Date',
        yaxis_title='Percentage of Final Content',
        hovermode='x unified',
        height=600,
        showlegend=True
    )

    return fig

print("🚀 Interactive visualization functions loaded!")

🚀 Interactive visualization functions loaded!


In [70]:
# =============================================================================
# CELL 3: EXECUTION FUNCTIONS
# =============================================================================

def visualize_individual_article(analysis_data, output_path=None, save_files=True):
    """Create visualizations for a single article analysis"""

    print(f"🎨 Creating visualizations for: {analysis_data['article_name']}")

    # Initialize visualization engine
    viz_engine = VisualizationEngine(output_path)

    # Create static charts
    print("📊 Generating static visualizations...")
    charts = viz_engine.create_summary_dashboard(analysis_data, mode="individual")

    # Create interactive chart
    print("🔧 Creating interactive flow chart...")
    interactive_flow = create_interactive_flow_chart(analysis_data, mode="individual")

    # Save files if requested
    saved_files = {}
    if save_files and output_path:
        saved_files = viz_engine.save_visualizations(
            charts,
            prefix=f"{analysis_data['article_name']}_analysis",
            formats=['png', 'svg']
        )

        # Save interactive chart
        interactive_file = os.path.join(output_path, f"{analysis_data['article_name']}_interactive_flow.html")
        interactive_flow.write_html(interactive_file)
        saved_files['interactive_flow'] = [interactive_file]
        print(f"💾 Interactive chart saved: {interactive_file}")

    # Display interactive chart
    interactive_flow.show()

    return charts, interactive_flow, saved_files

def visualize_trend_analysis(trend_data, output_path=None, save_files=True):
    """Create visualizations for trend analysis"""

    period = trend_data['period']
    print(f"📈 Creating trend visualizations for period: {period}")

    # Initialize visualization engine
    viz_engine = VisualizationEngine(output_path)

    # Create static charts
    print("📊 Generating static trend visualizations...")
    charts = viz_engine.create_summary_dashboard(trend_data, f"Trend Analysis - {period}", mode="trend")

    # Create interactive charts
    print("🔧 Creating interactive trend charts...")
    interactive_flow = create_interactive_flow_chart(trend_data, mode="trend")
    interactive_trends = create_interactive_trends_chart(trend_data)

    # Save files if requested
    saved_files = {}
    if save_files and output_path:
        saved_files = viz_engine.save_visualizations(
            charts,
            prefix=f"trend_analysis_{period}",
            formats=['png', 'svg']
        )

        # Save interactive charts
        if output_path:
            flow_file = os.path.join(output_path, f"trend_flow_{period}.html")
            interactive_flow.write_html(flow_file)
            saved_files['interactive_flow'] = [flow_file]

            if interactive_trends:
                trends_file = os.path.join(output_path, f"trend_timeline_{period}.html")
                interactive_trends.write_html(trends_file)
                saved_files['interactive_trends'] = [trends_file]
                print(f"💾 Interactive charts saved to: {output_path}")

    # Display interactive charts
    interactive_flow.show()
    if interactive_trends:
        interactive_trends.show()

    return charts, {'flow': interactive_flow, 'trends': interactive_trends}, saved_files

def quick_visualize(data, data_type="individual", output_path=None):
    """Quick visualization function for any analysis data"""

    if data_type == "individual":
        return visualize_individual_article(data, output_path, save_files=bool(output_path))
    elif data_type == "trend":
        return visualize_trend_analysis(data, output_path, save_files=bool(output_path))
    else:
        print(f"❌ Unknown data type: {data_type}")
        return None

print("🎯 Execution functions loaded!")
print("\nUsage examples:")
print("# Visualize individual article (using existing analysis data)")
print("charts, interactive, files = visualize_individual_article(combined_results, '/your/output/path')")
print("\n# Visualize trend analysis")
print("charts, interactive, files = visualize_trend_analysis(trend_results, '/your/output/path')")
print("\n# Quick visualization without saving")
print("quick_visualize(combined_results, 'individual')")

🎯 Execution functions loaded!

Usage examples:
# Visualize individual article (using existing analysis data)
charts, interactive, files = visualize_individual_article(combined_results, '/your/output/path')

# Visualize trend analysis
charts, interactive, files = visualize_trend_analysis(trend_results, '/your/output/path')

# Quick visualization without saving
quick_visualize(combined_results, 'individual')


In [71]:
# =============================================================================
# CELL 4: FOOTER METRICS GENERATOR
# =============================================================================

def generate_article_footer_graphics(analysis_data, output_path=None):
    """Generate clean, publication-ready graphics for article footers"""

    print(f"📄 Generating footer graphics for: {analysis_data['article_name']}")

    # Create a clean, minimal style for footer graphics
    plt.style.use('default')

    # Footer-specific color palette (more muted)
    footer_colors = {
        'primary': '#2E86AB',
        'secondary': '#A23B72',
        'accent': '#F18F01',
        'neutral': '#C73E1D'
    }

    # Create a compact summary visualization
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8))
    fig.suptitle(f'Content Analysis Summary - {analysis_data["article_name"]}',
                 fontsize=14, fontweight='bold')

    # 1. Content Origins (Pie Chart)
    attribution_stats = analysis_data['attribution_analysis']['statistics']
    origin_dist = attribution_stats['origin_distribution']

    labels = []
    sizes = []
    colors = []

    for version, stats in origin_dist.items():
        if version != 'new_in_final':
            labels.append(f"{version.title()}\n{stats['percentage']:.1f}%")
            sizes.append(stats['percentage'])
            colors.append(footer_colors['primary'])

    if 'new_in_final' in origin_dist:
        labels.append(f"New Content\n{origin_dist['new_in_final']['percentage']:.1f}%")
        sizes.append(origin_dist['new_in_final']['percentage'])
        colors.append(footer_colors['accent'])

    ax1.pie(sizes, labels=labels, colors=colors, autopct='', startangle=90)
    ax1.set_title('Content Origins', fontweight='bold')

    # 2. Word Count Progression (Bar Chart)
    processing = analysis_data['processing_summary']
    versions = ['Draft', 'Refined', 'Edited', 'Final']
    word_counts = []

    for version in ['draft', 'refined', 'edited', 'final']:
        if version in processing:
            word_counts.append(processing[version]['word_count'])
        else:
            word_counts.append(0)

    bars = ax2.bar(versions, word_counts, color=footer_colors['secondary'], alpha=0.7)
    ax2.set_title('Word Count Evolution', fontweight='bold')
    ax2.set_ylabel('Words')

    # Add value labels on bars
    for bar, count in zip(bars, word_counts):
        if count > 0:
            height = bar.get_height()
            ax2.text(bar.get_x() + bar.get_width()/2., height + 5,
                    f'{int(count)}', ha='center', va='bottom', fontsize=9)

    # 3. Modification Intensity (Horizontal Bar)
    mod_dist = attribution_stats['modification_distribution']
    mod_categories = []
    mod_values = []

    for category, stats in mod_dist.items():
        clean_cat = category.replace('_', ' ').title()
        mod_categories.append(clean_cat)
        mod_values.append(stats['percentage'])

    bars = ax3.barh(mod_categories, mod_values, color=footer_colors['neutral'], alpha=0.7)
    ax3.set_title('Modification Levels', fontweight='bold')
    ax3.set_xlabel('Percentage')

    # Add percentage labels
    for bar, value in zip(bars, mod_values):
        width = bar.get_width()
        ax3.text(width + 1, bar.get_y() + bar.get_height()/2.,
                f'{value:.1f}%', ha='left', va='center', fontsize=9)

    # 4. Key Metrics Summary (Text)
    ax4.axis('off')

    # Calculate key metrics
    draft_to_final = analysis_data['similarity_analysis']['draft_to_final']
    similarity_pct = draft_to_final['full_text']['combined'] * 100 if draft_to_final else 0

    word_change = 0
    if 'draft' in processing and 'final' in processing:
        draft_words = processing['draft']['word_count']
        final_words = processing['final']['word_count']
        word_change = ((final_words - draft_words) / draft_words) * 100

    # Display key metrics as text
    metrics_text = f"""Key Metrics:

Draft-Final Similarity: {similarity_pct:.1f}%

Word Count Change: {word_change:+.1f}%

Total Sentences: {attribution_stats['total_sentences']}

Analysis Date: {datetime.now().strftime('%Y-%m-%d')}"""

    ax4.text(0.1, 0.9, metrics_text, transform=ax4.transAxes,
             fontsize=11, verticalalignment='top', fontweight='bold',
             bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgray', alpha=0.3))

    ax4.set_title('Summary Statistics', fontweight='bold')

    plt.tight_layout()

    # Save footer graphic
    footer_files = {}
    if output_path:
        os.makedirs(output_path, exist_ok=True)

        # Save in multiple formats for footer use
        for fmt in ['png', 'svg', 'pdf']:
            footer_file = os.path.join(output_path, f"{analysis_data['article_name']}_footer.{fmt}")
            fig.savefig(footer_file, dpi=300, bbox_inches='tight',
                       facecolor='white', edgecolor='none')
            footer_files[fmt] = footer_file

        print(f"📄 Footer graphics saved: {len(footer_files)} formats")

    return fig, footer_files

def create_minimal_attribution_chart(analysis_data, output_path=None):
    """Create a minimal, clean chart suitable for article footers"""

    # Very simple pie chart for content attribution
    fig, ax = plt.subplots(figsize=(6, 4))

    attribution_stats = analysis_data['attribution_analysis']['statistics']
    origin_dist = attribution_stats['origin_distribution']

    # Aggregate into simple categories
    ai_generated = 0  # draft + refined
    human_edited = 0  # edited
    new_content = 0   # new_in_final

    for version, stats in origin_dist.items():
        if version in ['draft', 'refined']:
            ai_generated += stats['percentage']
        elif version == 'edited':
            human_edited = stats['percentage']
        elif version == 'new_in_final':
            new_content = stats['percentage']

    # Create simple pie chart
    labels = []
    sizes = []
    colors = ['#FF6B6B', '#4ECDC4', '#FECA57']

    if ai_generated > 0:
        labels.append(f'AI Generated\n{ai_generated:.1f}%')
        sizes.append(ai_generated)

    if human_edited > 0:
        labels.append(f'Human Edited\n{human_edited:.1f}%')
        sizes.append(human_edited)

    if new_content > 0:
        labels.append(f'New Content\n{new_content:.1f}%')
        sizes.append(new_content)

    pie_result = ax.pie(sizes, labels=labels, colors=colors[:len(sizes)],
                       autopct='', startangle=90, textprops={'fontsize': 10})

    # ax.pie returns different number of values based on parameters
    # We only need the wedges for our purposes
    wedges = pie_result[0] if isinstance(pie_result, tuple) else pie_result

    ax.set_title(f'Content Attribution - {analysis_data["article_name"]}',
                fontsize=12, fontweight='bold', pad=10)

    # Save minimal chart
    minimal_files = {}
    if output_path:
        os.makedirs(output_path, exist_ok=True)

        for fmt in ['png', 'svg']:
            minimal_file = os.path.join(output_path, f"{analysis_data['article_name']}_minimal.{fmt}")
            fig.savefig(minimal_file, dpi=300, bbox_inches='tight',
                       facecolor='white', edgecolor='none')
            minimal_files[fmt] = minimal_file

        print(f"📊 Minimal chart saved: {len(minimal_files)} formats")

    return fig, minimal_files

print("📄 Footer graphics functions loaded!")

📄 Footer graphics functions loaded!


In [74]:
# =============================================================================
# CELL 5: COMPLETE VISUALIZATION WORKFLOW
# =============================================================================

def create_complete_visualization_suite(analysis_data, output_path=None, data_type="individual"):
    """Create a complete suite of visualizations for any analysis data"""

    print(f"🎨 Creating complete visualization suite...")
    print(f"📊 Data type: {data_type}")

    if not output_path:
        print("⚠️ No output path provided - visualizations will not be saved")

    all_outputs = {
        'static_charts': {},
        'interactive_charts': {},
        'footer_graphics': {},
        'saved_files': {}
    }

    # Create main visualizations
    if data_type == "individual":
        # Individual article visualizations
        charts, interactive, files = visualize_individual_article(
            analysis_data, output_path, save_files=bool(output_path)
        )

        all_outputs['static_charts'] = charts
        all_outputs['interactive_charts'] = {'flow': interactive}
        all_outputs['saved_files'].update(files)

        # Create footer graphics
        if output_path:
            footer_fig, footer_files = generate_article_footer_graphics(analysis_data, output_path)
            minimal_fig, minimal_files = create_minimal_attribution_chart(analysis_data, output_path)

            all_outputs['footer_graphics']['full'] = footer_fig
            all_outputs['footer_graphics']['minimal'] = minimal_fig
            all_outputs['saved_files']['footer'] = footer_files
            all_outputs['saved_files']['minimal'] = minimal_files

            # Close footer figures
            plt.close(footer_fig)
            plt.close(minimal_fig)

    elif data_type == "trend":
        # Trend analysis visualizations
        charts, interactive, files = visualize_trend_analysis(
            analysis_data, output_path, save_files=bool(output_path)
        )

        all_outputs['static_charts'] = charts
        all_outputs['interactive_charts'] = interactive
        all_outputs['saved_files'].update(files)

    print(f"✅ Visualization suite complete!")
    if output_path:
        print(f"📁 All files saved to: {output_path}")

    return all_outputs

def visualize_from_existing_data(analysis_data_or_file, output_path=None, data_type="auto"):
    """Load and visualize from existing analysis data or file"""

    # Handle file input
    if isinstance(analysis_data_or_file, str):
        print(f"📖 Loading analysis data from: {analysis_data_or_file}")
        with open(analysis_data_or_file, 'r', encoding='utf-8') as f:
            analysis_data = json.load(f)
    else:
        analysis_data = analysis_data_or_file

    # Auto-detect data type if not specified
    if data_type == "auto":
        if 'article_name' in analysis_data and 'attribution_analysis' in analysis_data:
            data_type = "individual"
        elif 'period' in analysis_data and 'attribution_trends' in analysis_data:
            data_type = "trend"
        else:
            print("❌ Could not auto-detect data type")
            return None

    print(f"🔍 Detected data type: {data_type}")

    # Create visualizations
    return create_complete_visualization_suite(analysis_data, output_path, data_type)

# Quick access functions
def viz_article(analysis_data, output_path=None):
    """Quick function to visualize individual article"""
    # Use the same output path structure as the analysis if not provided
    if output_path is None and 'article_metadata' in analysis_data:
        # Try to derive output path from the analysis data
        input_path = analysis_data['article_metadata'].get('input_path')
        if input_path:
            output_path = os.path.join(input_path, 'output')

    return create_complete_visualization_suite(analysis_data, output_path, "individual")

def viz_trends(trend_data, output_path=None):
    """Quick function to visualize trend analysis"""
    return create_complete_visualization_suite(trend_data, output_path, "trend")

print("🚀 Complete visualization workflow loaded!")
print("\n" + "="*60)
print("📊 VISUALIZATION SYSTEM READY!")
print("="*60)
print("\nQuick usage:")
print("# Visualize your existing analysis data")
print("viz_outputs = viz_article(combined_results, '/your/output/path')")
print("\n# Visualize trend analysis")
print("trend_viz = viz_trends(trend_results, '/your/output/path')")
print("\n# Create footer graphics only")
print("footer_fig, files = generate_article_footer_graphics(combined_results, '/path')")
print("\nAll visualizations include:")
print("  📈 Static charts (PNG, SVG)")
print("  🔧 Interactive charts (HTML)")
print("  📄 Footer graphics (publication-ready)")
print("  💾 Multiple file formats")

🚀 Complete visualization workflow loaded!

📊 VISUALIZATION SYSTEM READY!

Quick usage:
# Visualize your existing analysis data
viz_outputs = viz_article(combined_results, '/your/output/path')

# Visualize trend analysis
trend_viz = viz_trends(trend_results, '/your/output/path')

# Create footer graphics only
footer_fig, files = generate_article_footer_graphics(combined_results, '/path')

All visualizations include:
  📈 Static charts (PNG, SVG)
  🔧 Interactive charts (HTML)
  📄 Footer graphics (publication-ready)
  💾 Multiple file formats


In [75]:
viz_outputs = viz_article(combined_results)

🎨 Creating complete visualization suite...
📊 Data type: individual
🎨 Creating visualizations for: markup-languages
📊 Generating static visualizations...
🔧 Creating interactive flow chart...
💾 Visualizations saved to: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human/output
  📊 flow: 2 formats
  📊 modification: 2 formats
  📊 word_count: 2 formats
  📊 similarity: 2 formats
💾 Interactive chart saved: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human/output/markup-languages_interactive_flow.html


📄 Generating footer graphics for: markup-languages
📄 Footer graphics saved: 3 formats
📊 Minimal chart saved: 2 formats
✅ Visualization suite complete!
📁 All files saved to: /content/drive/MyDrive/Google Drive/syntaxandempathy/30-articles/ai-vs-human/output


<Figure size 1600x1200 with 0 Axes>