<a href="https://colab.research.google.com/github/tanatet8/Colab_Script/blob/main/metadata/Metadata_Extractor_To_Excel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================
# METADATA EXTRACTOR TO EXCEL
# Extract metadata จากทุก prompt → Excel สำหรับ LLM & Analysis
# รองรับ 300-500K prompts
# ============================================

# ============================================
# Block 1: Setup & Import
# ============================================
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import numpy as np
from pathlib import Path
import re
import json
import hashlib
from datetime import datetime
from collections import Counter, defaultdict
import warnings
warnings.filterwarnings('ignore')

print("✅ Libraries loaded")

In [None]:
# ============================================
# Block 2: Configuration
# ============================================
class Config:
    # Paths
    DATASET_DIR = '/content/drive/MyDrive/Dataset_Curation'
    OUTPUT_EXCEL = '/content/drive/MyDrive/Dataset_Curation/metadata_master.xlsx'

    # File patterns
    FILE_PATTERN = '*_FIXED_COMPLETE.md'  # หรือ '*_batch_*.md'

    # Processing
    MAX_PROMPTS = None  # None = all, or set limit

    # Metadata fields to extract (20+ fields for scalability)
    METADATA_FIELDS = [
        'prompt_id', 'batch_id', 'reasoning_type', 'sub_type',
        'difficulty', 'tier', 'model_size', 'language', 'domain_context',
        'contains_statistics', 'has_numerical_estimate', 'requires_visualization',
        'symbolic_risk', 'contains_fallacy_risk', 'confidence_level_expected',
        'is_behavior_driven', 'concept_tags', 'fallacy', 'fallacy_type',
        'chain_depth', 'tone_style', 'self_critique', 'belief_tracking',
        'eval_standard', 'reasoning_path_trace'
    ]

    # Additional tracking fields
    TRACKING_FIELDS = [
        'prompt_hash', 'file_source', 'extraction_date',
        'prompt_length_th', 'prompt_length_en', 'has_reasoning',
        'has_rejected', 'has_explanation', 'quality_score',
        'combination_key', 'saturation_level'
    ]

print(f"📁 Dataset: {Config.DATASET_DIR}")
print(f"📊 Output: {Config.OUTPUT_EXCEL}")

In [None]:
# ============================================
# Block 3: Metadata Extractor
# ============================================
class MetadataExtractor:

    @staticmethod
    def generate_hash(text):
        """Generate unique hash for prompt"""
        return hashlib.md5(text.encode()).hexdigest()[:12]

    @staticmethod
    def parse_block(block_text, file_name, prompt_num):
        """Extract all metadata from a prompt block"""
        data = {
            'file_source': file_name,
            'extraction_date': datetime.now().strftime('%Y-%m-%d'),
            'prompt_num': prompt_num
        }

        # Extract Metadata section
        meta_match = re.search(r'###\s*Metadata\s*\n(.*?)(?=\n###|\n##|$)',
                              block_text, re.DOTALL)
        if meta_match:
            metadata_text = meta_match.group(1)
            # Parse each metadata field
            for line in metadata_text.split('\n'):
                if ':' in line:
                    key, value = line.split(':', 1)
                    key = key.strip()
                    value = value.strip()

                    # Special handling for different types
                    if key == 'concept_tags':
                        # Clean concept tags format
                        value = value.strip('[]')
                        data[key] = value
                        # Also create individual tag columns
                        tags = [t.strip(' "\'') for t in value.split(',')]
                        data['tag_count'] = len(tags)
                        data['tag_1'] = tags[0] if len(tags) > 0 else ''
                        data['tag_2'] = tags[1] if len(tags) > 1 else ''
                        data['tag_3'] = tags[2] if len(tags) > 2 else ''
                    else:
                        data[key] = value

        # Extract prompts content
        for lang in ['TH', 'EN', 'ZH']:
            pattern = rf'###?\s*Prompt\s*\({lang}\)\s*\n(.*?)(?=\n###|\n##|$)'
            match = re.search(pattern, block_text, re.DOTALL)
            if match:
                prompt_text = match.group(1).strip()
                data[f'prompt_{lang.lower()}'] = prompt_text[:100]  # First 100 chars for preview
                data[f'prompt_length_{lang.lower()}'] = len(prompt_text)

                # Generate hash from first available prompt
                if 'prompt_hash' not in data and prompt_text:
                    data['prompt_hash'] = MetadataExtractor.generate_hash(prompt_text)

        # Check for other sections
        data['has_reasoning'] = 'Y' if '### Reasoning' in block_text else 'N'
        data['has_rejected'] = 'Y' if '### Rejected Reasoning' in block_text else 'N'
        data['has_explanation'] = 'Y' if '### Explanation' in block_text else 'N'

        # Create combination key for tracking
        reasoning_type = data.get('reasoning_type', 'unknown')
        sub_type = data.get('sub_type', 'unknown')
        domain = data.get('domain_context', 'unknown')
        data['combination_key'] = f"{reasoning_type}|{sub_type}|{domain}"

        # Default quality score (can be updated manually later)
        data['quality_score'] = 'pending'

        return data

    @staticmethod
    def load_all_files(dataset_dir, file_pattern, max_prompts=None):
        """Load and extract metadata from all files"""
        all_metadata = []
        md_files = sorted(Path(dataset_dir).glob(file_pattern))

        print(f"📂 Found {len(md_files)} files matching pattern: {file_pattern}")

        total_prompts = 0
        for file_path in md_files:
            print(f"  📄 Processing: {file_path.name}")

            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()

            # Split by prompt blocks
            blocks = re.split(r'##\s*Prompt\s+(\d+)', content)[1:]

            # Process pairs of (number, content)
            for i in range(0, len(blocks), 2):
                if i+1 < len(blocks):
                    prompt_num = blocks[i]
                    block_content = blocks[i+1]

                    if max_prompts and total_prompts >= max_prompts:
                        break

                    metadata = MetadataExtractor.parse_block(
                        f"## Prompt {prompt_num}\n{block_content}",
                        file_path.name,
                        int(prompt_num)
                    )
                    all_metadata.append(metadata)
                    total_prompts += 1

            if max_prompts and total_prompts >= max_prompts:
                print(f"  ⚠️ Reached max prompts limit: {max_prompts}")
                break

        print(f"✅ Extracted metadata from {total_prompts} prompts")
        return pd.DataFrame(all_metadata)

In [None]:
# ============================================
# Block 4: Analysis & Statistics
# ============================================
class MetadataAnalyzer:

    @staticmethod
    def calculate_saturation(df):
        """Calculate saturation level for each combination"""
        # Count combinations
        combo_counts = df['combination_key'].value_counts()

        # Define saturation levels
        def get_saturation(count):
            if count >= 10:
                return 'HIGH'
            elif count >= 5:
                return 'MEDIUM'
            else:
                return 'LOW'

        # Apply saturation level
        df['saturation_level'] = df['combination_key'].apply(
            lambda x: get_saturation(combo_counts.get(x, 0))
        )

        return df

    @staticmethod
    def generate_statistics(df):
        """Generate comprehensive statistics"""
        stats = {
            'Total Prompts': len(df),
            'Unique Files': df['file_source'].nunique() if 'file_source' in df.columns else 0,
            'Reasoning Types': df['reasoning_type'].nunique() if 'reasoning_type' in df.columns else 0,
            'Sub Types': df['sub_type'].nunique() if 'sub_type' in df.columns else 0,
            'Domains': df['domain_context'].nunique() if 'domain_context' in df.columns else 0,
            'Languages': df['language'].nunique() if 'language' in df.columns else 0,
            'Avg Chain Depth': df['chain_depth'].apply(lambda x: int(x) if pd.notna(x) and str(x).isdigit() else 0).mean()
        }

        # Difficulty distribution
        if 'difficulty' in df.columns:
            diff_dist = df['difficulty'].value_counts()
            stats['Easy %'] = diff_dist.get('easy', 0) / len(df) * 100
            stats['Medium %'] = diff_dist.get('medium', 0) / len(df) * 100
            stats['Hard %'] = diff_dist.get('hard', 0) / len(df) * 100

        # Tier distribution
        if 'tier' in df.columns:
            tier_dist = df['tier'].value_counts()
            for tier in range(1, 7):
                stats[f'Tier {tier} Count'] = tier_dist.get(str(tier), 0)

        return stats

    @staticmethod
    def find_gaps(df):
        """Find gaps in coverage"""
        gaps = []

        # Check reasoning_type × difficulty
        if 'reasoning_type' in df.columns and 'difficulty' in df.columns:
            for rtype in df['reasoning_type'].unique():
                for diff in ['easy', 'medium', 'hard']:
                    count = len(df[(df['reasoning_type'] == rtype) &
                                  (df['difficulty'] == diff)])
                    if count < 5:  # Threshold
                        gaps.append({
                            'Gap Type': 'Type×Difficulty',
                            'Combination': f"{rtype} + {diff}",
                            'Current Count': count,
                            'Recommended': 5,
                            'Priority': 'High' if count == 0 else 'Medium'
                        })

        # Check domain × sub_type
        if 'domain_context' in df.columns and 'sub_type' in df.columns:
            for domain in df['domain_context'].unique():
                for subtype in df['sub_type'].unique():
                    count = len(df[(df['domain_context'] == domain) &
                                  (df['sub_type'] == subtype)])
                    if count == 0:  # Only show complete gaps
                        gaps.append({
                            'Gap Type': 'Domain×SubType',
                            'Combination': f"{domain} + {subtype}",
                            'Current Count': 0,
                            'Recommended': 3,
                            'Priority': 'Low'
                        })

        return pd.DataFrame(gaps[:50])  # Top 50 gaps

In [None]:
# ============================================
# Block 5: LLM Instructions Generator
# ============================================
class LLMInstructions:

    @staticmethod
    def generate_instructions(df, stats_dict):
        """Generate instructions for LLM to avoid duplicates"""

        instructions = []

        # Header
        instructions.append("="*60)
        instructions.append("LLM DATASET CREATION GUIDELINES")
        instructions.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        instructions.append("="*60)

        # Current Status
        instructions.append("\n📊 CURRENT DATASET STATUS:")
        instructions.append(f"• Total Prompts: {stats_dict['Total Prompts']}")
        instructions.append(f"• Reasoning Types: {stats_dict['Reasoning Types']}")
        instructions.append(f"• Domains: {stats_dict['Domains']}")
        instructions.append(f"• Difficulty: Easy {stats_dict.get('Easy %', 0):.1f}%, "
                          f"Medium {stats_dict.get('Medium %', 0):.1f}%, "
                          f"Hard {stats_dict.get('Hard %', 0):.1f}%")

        # Oversaturated Combinations
        instructions.append("\n🚫 AVOID CREATING (Oversaturated):")
        combo_counts = df['combination_key'].value_counts()
        for combo, count in combo_counts.head(10).items():
            if count >= 10:
                parts = combo.split('|')
                instructions.append(f"  ❌ {parts[0]} + {parts[1]} + {parts[2]} (has {count} already)")

        # Recommended to Create
        instructions.append("\n✅ PRIORITIZE CREATING (Gaps):")

        # Find missing combinations
        if 'reasoning_type' in df.columns and 'domain_context' in df.columns:
            existing_combos = set(df['combination_key'].unique())

            # Sample of what's missing
            priority_domains = ['technology', 'economics', 'social_science', 'philosophy']
            priority_subtypes = ['reverse_causality', 'feedback_loop', 'hidden_variable', 'temporal_lag']

            for domain in priority_domains:
                for subtype in priority_subtypes:
                    test_combo = f"causal_reasoning|{subtype}|{domain}"
                    if test_combo not in existing_combos:
                        instructions.append(f"  ✅ causal_reasoning + {subtype} + {domain}")

        # Balance Recommendations
        instructions.append("\n⚖️ BALANCE RECOMMENDATIONS:")
        instructions.append("• Target Difficulty: Easy 25%, Medium 50%, Hard 25%")
        instructions.append("• Target Tiers: Tier 1-2 (40%), Tier 3-4 (35%), Tier 5-6 (25%)")
        instructions.append("• Each sub_type should have 5-10 prompts")
        instructions.append("• Each domain should have 10-20 prompts")

        # Concept Tags to Explore
        instructions.append("\n🏷️ UNDERUSED CONCEPT TAGS TO EXPLORE:")
        if 'concept_tags' in df.columns:
            all_tags = []
            for tags in df['concept_tags'].dropna():
                all_tags.extend([t.strip(' "\'[]') for t in str(tags).split(',')])
            tag_counts = Counter(all_tags)

            # Find underused tags
            underused = [tag for tag, count in tag_counts.items() if count < 3]
            if underused:
                instructions.append(f"  • Consider using: {', '.join(underused[:10])}")

        # Quality Guidelines
        instructions.append("\n📝 QUALITY GUIDELINES:")
        instructions.append("• Include (TH), (EN), (ZH) versions for all prompts")
        instructions.append("• Provide reasoning with clear logical steps")
        instructions.append("• Include rejected reasoning with fallacy identification")
        instructions.append("• Add explanation comparing correct vs incorrect")
        instructions.append("• Maintain chain_depth of 2-4 for complexity")

        return '\n'.join(instructions)

In [None]:
# ============================================
# Block 6: Excel Export
# ============================================
class ExcelExporter:

    @staticmethod
    def export_to_excel(df, stats_dict, gaps_df, llm_instructions, output_path):
        """Export everything to Excel with multiple sheets"""

        # Calculate saturation levels
        df = MetadataAnalyzer.calculate_saturation(df)

        # Create Excel writer
        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:

            # Sheet 1: All Metadata
            df.to_excel(writer, sheet_name='All_Metadata', index=False)

            # Sheet 2: Summary Statistics
            stats_df = pd.DataFrame([stats_dict]).T.reset_index()
            stats_df.columns = ['Metric', 'Value']
            stats_df.to_excel(writer, sheet_name='Summary_Stats', index=False)

            # Sheet 3: Distribution Analysis
            dist_data = []
            for col in ['reasoning_type', 'sub_type', 'domain_context', 'difficulty', 'tier']:
                if col in df.columns:
                    value_counts = df[col].value_counts()
                    for val, count in value_counts.items():
                        dist_data.append({
                            'Category': col,
                            'Value': val,
                            'Count': count,
                            'Percentage': count / len(df) * 100,
                            'Status': 'High' if count > 20 else 'OK' if count > 5 else 'Low'
                        })
            pd.DataFrame(dist_data).to_excel(writer, sheet_name='Distribution', index=False)

            # Sheet 4: Combination Matrix
            if 'reasoning_type' in df.columns and 'sub_type' in df.columns:
                pivot_table = pd.crosstab(df['reasoning_type'], df['sub_type'])
                pivot_table.to_excel(writer, sheet_name='Type_x_SubType_Matrix')

            # Sheet 5: Domain Coverage
            if 'domain_context' in df.columns and 'sub_type' in df.columns:
                domain_pivot = pd.crosstab(df['domain_context'], df['sub_type'])
                domain_pivot.to_excel(writer, sheet_name='Domain_x_SubType_Matrix')

            # Sheet 6: Gaps Analysis
            if not gaps_df.empty:
                gaps_df.to_excel(writer, sheet_name='Coverage_Gaps', index=False)

            # Sheet 7: LLM Instructions
            instructions_df = pd.DataFrame({'Instructions': llm_instructions.split('\n')})
            instructions_df.to_excel(writer, sheet_name='LLM_Instructions', index=False)

            # Sheet 8: High Saturation List
            high_sat = df[df['saturation_level'] == 'HIGH'][
                ['prompt_id', 'combination_key', 'saturation_level']
            ].drop_duplicates()
            high_sat.to_excel(writer, sheet_name='High_Saturation', index=False)

            # Sheet 9: Tag Analysis
            if 'concept_tags' in df.columns:
                all_tags = []
                for tags in df['concept_tags'].dropna():
                    for tag in str(tags).split(','):
                        all_tags.append(tag.strip(' "\'[]'))

                tag_counts = Counter(all_tags)
                tag_df = pd.DataFrame(tag_counts.items(), columns=['Tag', 'Count'])
                tag_df = tag_df.sort_values('Count', ascending=False)
                tag_df['Status'] = tag_df['Count'].apply(
                    lambda x: 'Overused' if x > 10 else 'OK' if x > 3 else 'Underused'
                )
                tag_df.to_excel(writer, sheet_name='Tag_Analysis', index=False)

        print(f"✅ Excel exported to: {output_path}")

In [None]:
# ============================================
# Block 7: Main Pipeline
# ============================================
def main():
    """Main extraction and analysis pipeline"""

    print("="*60)
    print("📊 METADATA EXTRACTOR TO EXCEL")
    print("="*60)

    # Step 1: Extract metadata
    print("\n📂 Step 1: Extracting metadata from files...")
    df = MetadataExtractor.load_all_files(
        Config.DATASET_DIR,
        Config.FILE_PATTERN,
        Config.MAX_PROMPTS
    )

    if df.empty:
        print("❌ No data extracted!")
        return None

    print(f"✅ Extracted {len(df)} prompts with metadata")

    # Step 2: Generate statistics
    print("\n📈 Step 2: Generating statistics...")
    stats_dict = MetadataAnalyzer.generate_statistics(df)

    # Print summary
    print("\n📊 Summary:")
    for key, value in stats_dict.items():
        if isinstance(value, float):
            print(f"  • {key}: {value:.2f}")
        else:
            print(f"  • {key}: {value}")

    # Step 3: Find gaps
    print("\n🔍 Step 3: Finding coverage gaps...")
    gaps_df = MetadataAnalyzer.find_gaps(df)
    print(f"  Found {len(gaps_df)} gaps in coverage")

    # Step 4: Generate LLM instructions
    print("\n📝 Step 4: Generating LLM instructions...")
    llm_instructions = LLMInstructions.generate_instructions(df, stats_dict)

    # Step 5: Export to Excel
    print("\n💾 Step 5: Exporting to Excel...")
    ExcelExporter.export_to_excel(
        df, stats_dict, gaps_df, llm_instructions,
        Config.OUTPUT_EXCEL
    )

    # Summary
    print("\n" + "="*60)
    print("✅ EXTRACTION COMPLETE!")
    print("="*60)
    print(f"\n📁 Output file: {Config.OUTPUT_EXCEL}")
    print("\n📊 Excel contains:")
    print("  1. All_Metadata - Full dataset with 20+ fields")
    print("  2. Summary_Stats - Key metrics")
    print("  3. Distribution - Count analysis")
    print("  4. Type_x_SubType_Matrix - Coverage matrix")
    print("  5. Domain_x_SubType_Matrix - Domain coverage")
    print("  6. Coverage_Gaps - Missing combinations")
    print("  7. LLM_Instructions - Guidelines for new prompts")
    print("  8. High_Saturation - Oversaturated combinations")
    print("  9. Tag_Analysis - Concept tag usage")

    print("\n💡 Next steps:")
    print("  1. Open Excel file")
    print("  2. Copy data to your template")
    print("  3. Share with LLM for prompt generation")
    print("  4. Use pivot tables for deeper analysis")

    return df

In [None]:
# ============================================
# Block 8: Run Pipeline
# ============================================
if __name__ == "__main__":
    df_metadata = main()

    # Optional: Quick preview
    if df_metadata is not None and not df_metadata.empty:
        print("\n👀 Preview first 5 rows:")
        print(df_metadata[['prompt_id', 'reasoning_type', 'sub_type',
                          'domain_context', 'difficulty']].head())