<a href="https://colab.research.google.com/github/purusottamjena/CPPUtest/blob/main/Dataset_cleaning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from huggingface_hub import notebook_login
%pip install --upgrade datasets pandas numpy


# Authenticate with Hugging Face Hub
notebook_login()

In [None]:
from IPython import get_ipython
from IPython.display import display
# %%
from huggingface_hub import notebook_login
# %pip install --upgrade datasets pandas numpy


# Authenticate with Hugging Face Hub
notebook_login()
# %%


# Embedded Unit Test Dataset Cleaning Script for Google Colab
# This script cleans the Hugging Face dataset: athrv/Embedded_unit_test

# Install required packages
# !pip install --upgrade datasets pandas numpy  # Upgrade datasets to the latest version
%pip install --upgrade datasets==2.14.6 pandas numpy scipy
from IPython import get_ipython
from IPython.display import display
# Embedded Unit Test Dataset Cleaning Script for Google Colab
# This script cleans the Hugging Face dataset: athrv/Embedded_unit_test

# # Install required packages
# !pip install datasets pandas numpy

import pandas as pd
import numpy as np
import re
from datasets import load_dataset
from typing import Optional, List, Dict
import matplotlib.pyplot as plt
import seaborn as sns

class EmbeddedDatasetCleaner:
    """
    A comprehensive cleaner for the Embedded Unit Test dataset
    """

    def __init__(self):
        self.original_data = None
        self.cleaned_data = None
        self.cleaning_stats = {
            'original_rows': 0,
            'rows_after_null_removal': 0,
            'rows_after_minimal_content_removal': 0,
            'rows_after_duplicate_removal': 0,
            'comments_removed': 0,
            'final_rows': 0
        }

    def load_dataset(self):
        """Load the dataset from Hugging Face"""
        print("Loading dataset from Hugging Face...")
        dataset = load_dataset("athrv/Embedded3")
        self.original_data = dataset['train'].to_pandas()
        self.cleaning_stats['original_rows'] = len(self.original_data)
        print(f"✅ Dataset loaded: {len(self.original_data)} rows")
        return self.original_data.copy()

    def remove_comments_from_code(self, code_text: str) -> str:
        """
        Remove various types of comments from code while preserving string literals
        """
        if pd.isna(code_text) or code_text == "Not Found" or code_text == "null":
            return code_text

        # Remove single-line comments (// style)
        # This regex removes // comments but preserves them if they're in string literals
        code_text = re.sub(r'^\s*//.*$', '', code_text, flags=re.MULTILINE)

        # Remove multi-line comments (/* */ style)
        code_text = re.sub(r'/\*.*?\*/', '', code_text, flags=re.DOTALL)

        # Remove commented code blocks (especially //NOSONAR marked blocks)
        # Remove lines that are clearly commented out code
        lines = code_text.split('\n')
        cleaned_lines = []

        for line in lines:
            stripped = line.strip()
            # Skip if line is just a comment or empty
            if (stripped.startswith('//') or
                stripped.startswith('/*') or
                stripped.startswith('*') or
                stripped == '' or
                '//NOSONAR' in stripped):
                continue
            cleaned_lines.append(line)

        # Join back and clean up extra whitespace
        result = '\n'.join(cleaned_lines)

        # Remove excessive newlines (more than 2 consecutive)
        result = re.sub(r'\n{3,}', '\n\n', result)

        # Clean up leading/trailing whitespace
        result = result.strip()

        return result

    def is_minimal_content(self, code_text: str) -> bool:
        """
        Check if code content is minimal/empty and should be removed
        """
        if pd.isna(code_text) or code_text == "Not Found" or code_text == "null":
            return True

        # Clean the text for analysis
        cleaned = self.remove_comments_from_code(code_text)
        cleaned = re.sub(r'\s+', ' ', cleaned).strip()

        # Consider minimal if:
        # 1. Empty after cleaning
        # 2. Only contains pragma once
        # 3. Only contains include statements
        # 4. Very short (less than 50 characters of actual code)

        if len(cleaned) == 0:
            return True

        if cleaned in ['#pragma once', '#pragma once;']:
            return True

        # Count non-whitespace, non-preprocessor characters
        code_content = re.sub(r'#.*$', '', cleaned, flags=re.MULTILINE)
        code_content = re.sub(r'\s+', '', code_content)

        return len(code_content) < 20  # Very minimal actual code

    def remove_null_and_not_found(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Remove rows where key code columns are null, 'Not Found', or 'null'
        """
        print("🧹 Removing rows with null/not found content...")

        # Columns that should have meaningful content - UPDATED COLUMN NAMES
        key_columns = ['Code1', 'Unit Test (.cpp file)']

        initial_count = len(df)

        # Remove rows where ALL key columns are null/not found
        indices_to_remove = []
        for idx, row in df.iterrows():
            all_empty = True
            for col in key_columns:
                if col in df.columns:
                    if (not pd.isna(row[col]) and
                        row[col] not in ['Not Found', 'null', None, '']):
                        all_empty = False
                        break

            if all_empty:
                indices_to_remove.append(idx)

        df = df.drop(indices_to_remove)

        final_count = len(df)
        removed = initial_count - final_count
        self.cleaning_stats['rows_after_null_removal'] = final_count

        print(f"   Removed {removed} rows with no meaningful content")
        print(f"   Remaining: {final_count} rows")
        return df # Return the modified dataframe

    def clean_dataset_gentle(self) -> pd.DataFrame:
        """
        Gentle cleaning pipeline that preserves more data
        """
        print("\n🚀 Starting GENTLE dataset cleaning pipeline...")
        print("=" * 50)

        # Load dataset
        df = self.cleaned_data if self.cleaned_data is not None else self.original_data.copy()

        # Step 1: Only remove completely empty rows
        df = self.remove_completely_empty_rows(df)

        # Step 2: Clean comments but preserve structure
        df = self.clean_code_content(df)

        # Step 3: Only remove exact duplicates
        df = self.remove_exact_duplicates(df)

        self.cleaned_data = df
        self.cleaning_stats['final_rows'] = len(df)

        print("\n✅ Gentle cleaning completed!")
        self.print_cleaning_summary()

        return df

    def remove_completely_empty_rows(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Only remove rows that are completely empty in all important columns
        """
        print("🧹 Removing completely empty rows...")

        initial_count = len(df)

        # Only remove if ALL important columns are empty/null/not found
        important_cols = ['Code1', 'Unit Test (.cpp file)', 'Base File Name']

        indices_to_remove = []
        for idx, row in df.iterrows():
            completely_empty = True
            for col in important_cols:
                if col in df.columns:
                    val = row[col]
                    if (pd.notna(val) and
                        val not in ['Not Found', 'null', None, ''] and
                        len(str(val).strip()) > 5):  # At least some content
                        completely_empty = False
                        break

            if completely_empty:
                indices_to_remove.append(idx)

        df = df.drop(indices_to_remove)

        final_count = len(df)
        removed = initial_count - final_count
        self.cleaning_stats['rows_after_null_removal'] = final_count
        self.cleaning_stats['rows_after_minimal_content_removal'] = final_count

        print(f"   Removed {removed} completely empty rows")
        print(f"   Remaining: {final_count} rows")

        return df

    def remove_exact_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Remove only exact duplicates, not similar content
        """
        print("🧹 Removing exact duplicates...")

        initial_count = len(df)

        # Use pandas built-in duplicate removal on key columns
        key_cols = []
        for col in ['Code1', 'Unit Test (.cpp file)', 'Base File Name']:
            if col in df.columns:
                key_cols.append(col)

        if key_cols:
            df = df.drop_duplicates(subset=key_cols, keep='first')

        final_count = len(df)
        removed = initial_count - final_count
        self.cleaning_stats['rows_after_duplicate_removal'] = final_count

        print(f"   Removed {removed} exact duplicate rows")
        print(f"   Remaining: {final_count} rows")

        return df

    def remove_minimal_content_files(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Remove files that have only minimal content (like just #pragma once)
        """
        print("🧹 Removing files with minimal content...")

        initial_count = len(df)

        # Check each row for minimal content - UPDATED COLUMN NAMES
        indices_to_remove = []

        for idx, row in df.iterrows():
            code_cols = ['Code1', 'Unit Test (.cpp file)']

            # Count how many columns have substantial content
            substantial_content_count = 0

            for col in code_cols:
                if col in df.columns:
                    if (not pd.isna(row[col]) and
                        row[col] not in ['Not Found', 'null', None, ''] and
                        not self.is_minimal_content(row[col])):
                        substantial_content_count += 1

            # Remove if no substantial content in any column
            if substantial_content_count == 0:
                indices_to_remove.append(idx)

        df = df.drop(indices_to_remove)

        final_count = len(df)
        removed = initial_count - final_count
        self.cleaning_stats['rows_after_minimal_content_removal'] = final_count

        print(f"   Removed {removed} rows with minimal content")
        print(f"   Remaining: {final_count} rows")

        return df

    def clean_code_content(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Clean comments from all code columns
        """
        print("🧹 Removing comments from code content...")

        # UPDATED COLUMN NAMES
        code_columns = ['Code1', 'Unit Test (.cpp file)', 'CMakeLists']
        comment_count = 0

        for col in code_columns:
            if col in df.columns:
                print(f"   Cleaning comments from '{col}'...")
                original_lengths = df[col].astype(str).str.len().sum()
                df[col] = df[col].apply(self.remove_comments_from_code)
                cleaned_lengths = df[col].astype(str).str.len().sum()
                removed = original_lengths - cleaned_lengths
                comment_count += removed
                print(f"   Removed {removed:,} characters from '{col}'")
            else:
                print(f"   Warning: Code column '{col}' not found for cleaning.")

        self.cleaning_stats['comments_removed'] = comment_count
        print(f"   Total removed: {comment_count:,} characters of comments")

        return df

    def remove_duplicate_cmake_content(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Remove duplicate CMakeLists content and replace with simplified version
        """
        print("🧹 Cleaning duplicate CMakeLists content...")

        if 'CMakeLists' in df.columns:
            # Find the most common CMakeLists content
            cmake_counts = df['CMakeLists'].value_counts()

            if len(cmake_counts) > 0:
                most_common = cmake_counts.index[0]
                count = cmake_counts.iloc[0]

                print(f"   Most common CMakeLists content appears {count} times")

                # Replace overly long CMakeLists with a simplified version
                simplified_cmake = """cmake_minimum_required(VERSION 3.16)
project(embedded_project)
enable_testing()
add_executable(tests test_sources)
target_link_libraries(tests)"""

                # Replace long CMakeLists content
                df.loc[df['CMakeLists'].str.len() > 500, 'CMakeLists'] = simplified_cmake

        return df

    def remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Remove duplicate rows based on code content
        """
        print("🧹 Removing duplicate rows...")

        initial_count = len(df)

        # Create a hash of the main code content for duplicate detection - UPDATED COLUMN NAMES
        hash_components = []

        if 'Code1' in df.columns:
            hash_components.append(df['Code1'].astype(str))
        if 'Unit Test (.cpp file)' in df.columns:
            hash_components.append(df['Unit Test (.cpp file)'].astype(str))
        if 'Base File Name' in df.columns:
            hash_components.append(df['Base File Name'].astype(str))

        if hash_components:
            df['content_hash'] = pd.concat(hash_components, axis=1).apply(
                lambda x: hash(tuple(x)), axis=1
            )

            # Remove duplicates based on content hash
            df = df.drop_duplicates(subset=['content_hash'])
            df = df.drop('content_hash', axis=1)

        final_count = len(df)
        removed = initial_count - final_count
        self.cleaning_stats['rows_after_duplicate_removal'] = final_count

        print(f"   Removed {removed} duplicate rows")
        print(f"   Remaining: {final_count} rows")

        return df

    def clean_dataset(self) -> pd.DataFrame:
        """
        Main cleaning pipeline
        """
        print("🚀 Starting dataset cleaning pipeline...")
        print("=" * 50)

        # Load dataset
        df = self.load_dataset()

        # Step 1: Remove null/not found entries
        df = self.remove_null_and_not_found(df)

        # Step 2: Remove minimal content files
        df = self.remove_minimal_content_files(df)

        # Step 3: Clean code content (remove comments)
        df = self.clean_code_content(df)

        # Step 4: Clean CMakeLists duplicates
        df = self.remove_duplicate_cmake_content(df)

        # Step 5: Remove duplicates
        df = self.remove_duplicates(df)

        self.cleaned_data = df
        self.cleaning_stats['final_rows'] = len(df)

        print("\n✅ Cleaning completed!")
        self.print_cleaning_summary()

        return df

    def print_cleaning_summary(self):
        """
        Print a summary of the cleaning process
        """
        print("\n📊 CLEANING SUMMARY")
        print("=" * 50)
        print(f"Original rows:              {self.cleaning_stats['original_rows']:,}")
        print(f"After null removal:         {self.cleaning_stats['rows_after_null_removal']:,}")
        print(f"After minimal content:      {self.cleaning_stats['rows_after_minimal_content_removal']:,}")
        print(f"After duplicate removal:    {self.cleaning_stats['rows_after_duplicate_removal']:,}")
        print(f"Final rows:                 {self.cleaning_stats['final_rows']:,}")
        print(f"Comments removed (chars):   {self.cleaning_stats['comments_removed']:,}")
        print(f"Total reduction:            {self.cleaning_stats['original_rows'] - self.cleaning_stats['final_rows']:,} rows")
        print(f"Reduction percentage:       {((self.cleaning_stats['original_rows'] - self.cleaning_stats['final_rows']) / self.cleaning_stats['original_rows']) * 100:.1f}%")

    def analyze_cleaned_data(self):
        """
        Analyze the cleaned dataset
        """
        if self.cleaned_data is None:
            print("❌ No cleaned data available. Run clean_dataset() first.")
            return

        print("\n📈 CLEANED DATASET ANALYSIS")
        print("=" * 50)

        df = self.cleaned_data

        # Basic statistics
        print(f"Total cleaned rows: {len(df)}")
        print(f"Columns: {list(df.columns)}")

        # Category distribution
        if 'Category' in df.columns:
            print("\n📂 Category Distribution:")
            category_counts = df['Category'].value_counts()
            for cat, count in category_counts.items():
                print(f"   {cat}: {count}")

        # Language distribution
        if 'Language' in df.columns:
            print(f"\n💻 Language: {df['Language'].value_counts().to_dict()}")

        # Content length statistics - UPDATED COLUMN NAMES
        code_cols = ['Code1', 'Unit Test (.cpp file)']
        for col in code_cols:
            if col in df.columns:
                non_null = df[col].dropna()
                non_null = non_null[non_null != 'Not Found']
                non_null = non_null[non_null != 'null']
                non_null = non_null[non_null != '']

                if len(non_null) > 0:
                    lengths = non_null.str.len()
                    print(f"\n📏 {col} Length Stats:")
                    print(f"   Count: {len(non_null)}")
                    print(f"   Mean length: {lengths.mean():.0f} chars")
                    print(f"   Median length: {lengths.median():.0f} chars")
                    print(f"   Max length: {lengths.max():,} chars")

    def save_cleaned_data(self, filename: str = "cleaned_embedded_dataset.csv"):
        """
        Save the cleaned dataset to CSV
        """
        if self.cleaned_data is None:
            print("❌ No cleaned data available. Run clean_dataset() first.")
            return

        self.cleaned_data.to_csv(filename, index=False)
        print(f"💾 Cleaned dataset saved to: {filename}")

        # Also save a sample for inspection
        sample_file = f"sample_{filename}"
        self.cleaned_data.head(10).to_csv(sample_file, index=False)
        print(f"📄 Sample (10 rows) saved to: {sample_file}")

    def create_visualizations(self):
        """
        Create visualizations of the cleaning results
        """
        if self.cleaned_data is None:
            print("❌ No cleaned data available. Run clean_dataset() first.")
            return

        plt.figure(figsize=(15, 10))

        # Subplot 1: Cleaning pipeline results
        plt.subplot(2, 3, 1)
        steps = ['Original', 'After Null\nRemoval', 'After Minimal\nContent', 'After\nDuplicates', 'Final']
        counts = [
            self.cleaning_stats['original_rows'],
            self.cleaning_stats['rows_after_null_removal'],
            self.cleaning_stats['rows_after_minimal_content_removal'],
            self.cleaning_stats['rows_after_duplicate_removal'],
            self.cleaning_stats['final_rows']
        ]

        plt.bar(steps, counts, color=['red', 'orange', 'yellow', 'lightgreen', 'green'])
        plt.title('Dataset Cleaning Pipeline')
        plt.ylabel('Number of Rows')
        plt.xticks(rotation=45)

        # Subplot 2: Category distribution
        plt.subplot(2, 3, 2)
        if 'Category' in self.cleaned_data.columns:
            category_counts = self.cleaned_data['Category'].value_counts()
            plt.pie(category_counts.values, labels=category_counts.index, autopct='%1.1f%%')
            plt.title('File Categories')

        # Subplot 3: Code length distribution - UPDATED COLUMN NAMES
        plt.subplot(2, 3, 3)
        code_cols = ['Code1', 'Unit Test (.cpp file)']
        lengths = []
        labels = []

        for col in code_cols:
            if col in self.cleaned_data.columns:
                non_null = self.cleaned_data[col].dropna()
                non_null = non_null[non_null != 'Not Found']
                non_null = non_null[non_null != 'null']
                non_null = non_null[non_null != '']

                if len(non_null) > 0:
                    lengths.extend(non_null.str.len().tolist())
                    labels.extend([col.split()[0]] * len(non_null))

        if lengths:
            plt.hist(lengths, bins=30, alpha=0.7)
            plt.title('Code Length Distribution')
            plt.xlabel('Characters')
            plt.ylabel('Frequency')
            plt.yscale('log')

        plt.tight_layout()
        plt.show()


# Additional utility functions for further analysis
# Moved these functions before the main() function call
def examine_dataset_structure(df):
    """
    Detailed examination of the dataset structure
    """
    print(f"\n🔬 DETAILED DATASET EXAMINATION")
    print("=" * 50)

    # Show first few rows
    print("📋 First 3 rows:")
    for i in range(min(3, len(df))):
        print(f"\nRow {i+1}:")
        row = df.iloc[i]
        for col in df.columns:
            val = row[col]
            if pd.notna(val) and val not in ['Not Found', 'null']:
                val_str = str(val)[:100] + "..." if len(str(val)) > 100 else str(val)
                print(f"   {col}: {val_str}")

    print(f"\n📊 Column Value Distributions:")
    for col in df.columns:
        if col in ['Code1', 'Unit Test (.cpp file)', 'Category']:
            print(f"\n{col}:")
            value_counts = df[col].value_counts().head(5)
            for val, count in value_counts.items():
                val_str = str(val)[:50] + "..." if len(str(val)) > 50 else str(val)
                print(f"   '{val_str}': {count} times")

def inspect_sample_entries(df, n=3):
    """
    Inspect a few sample entries to see the cleaning results
    """
    print(f"\n🔍 SAMPLE ENTRIES (showing {n} examples)")
    print("=" * 80)

    for i in range(min(n, len(df))):
        print(f"\n--- ENTRY {i+1} ---")
        row = df.iloc[i]

        print(f"File: {row['Base File Name']}")
        print(f"Category: {row['Category']}")

        # Show Code1 content (truncated) - UPDATED COLUMN NAME
        if pd.notna(row['Code1']) and row['Code1'] not in ['Not Found', 'null', '']:
            code1_content = str(row['Code1'])[:200] + "..." if len(str(row['Code1'])) > 200 else str(row['Code1'])
            print(f"Code1 Content: {code1_content}")

        # Show Unit Test content (truncated)
        if pd.notna(row['Unit Test (.cpp file)']) and row['Unit Test (.cpp file)'] not in ['Not Found', 'null', '']:
            test_content = str(row['Unit Test (.cpp file)'])[:200] + "..." if len(str(row['Unit Test (.cpp file)'])) > 200 else str(row['Unit Test (.cpp file)'])
            print(f"Unit Test Content: {test_content}")

        print("-" * 40)

def export_by_category(df, output_dir="cleaned_by_category"):
    """
    Export cleaned data separated by category
    """
    import os

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    if 'Category' in df.columns:
        for category in df['Category'].unique():
            if pd.notna(category):
                category_df = df[df['Category'] == category]
                filename = f"{output_dir}/{category.replace('/', '_').replace(' ', '_')}.csv"
                category_df.to_csv(filename, index=False)
                print(f"📁 Exported {len(category_df)} rows to {filename}")


# Usage Example
def main():
    """
    Main function to run the cleaning process
    """
    print("🔧 EMBEDDED UNIT TEST DATASET CLEANER")
    print("=" * 50)

    # Initialize cleaner
    cleaner = EmbeddedDatasetCleaner()

    # Load and explore dataset first
    df = cleaner.load_dataset()

    # Debug: Show dataset structure
    print(f"\n🔍 DATASET EXPLORATION")
    print("=" * 30)
    print(f"Columns: {list(df.columns)}")
    print(f"Shape: {df.shape}")

    # Show sample of non-null values in key columns
    for col in ['Code1', 'Unit Test (.cpp file)', 'Category']:
        if col in df.columns:
            non_null_count = df[col].notna().sum()
            not_found_count = (df[col] == 'Not Found').sum()
            null_count = (df[col] == 'null').sum()
            empty_count = (df[col] == '').sum()

            print(f"\n📊 {col}:")
            print(f"   Non-null: {non_null_count}")
            print(f"   'Not Found': {not_found_count}")
            print(f"   'null': {null_count}")
            print(f"   Empty string: {empty_count}")

            # Show a few sample values
            sample_values = df[col].dropna().head(3).tolist()
            print(f"   Sample values: {sample_values}")

    # Ask user if they want to proceed with gentle or aggressive cleaning
    print(f"\n🎯 CLEANING OPTIONS:")
    print("1. Gentle cleaning (preserve more data)")
    print("2. Aggressive cleaning (remove more noise)")

    # For now, let's use gentle cleaning by default
    print("Using gentle cleaning...")

    # Examine data before cleaning
    examine_dataset_structure(df)

    # Clean the dataset with modified approach
    cleaned_df = cleaner.clean_dataset_gentle()

    if len(cleaned_df) > 0:
        # Analyze results
        cleaner.analyze_cleaned_data()

        # Create visualizations
        cleaner.create_visualizations()

        # Save results
        cleaner.save_cleaned_data()

        print("\n🎉 All done! Your dataset has been cleaned and saved.")
    else:
        print("\n❌ No data remaining after cleaning. Try gentler settings.")
        # Show original data for inspection
        inspect_sample_entries(df, n=5)

    return cleaned_df

# Run the cleaning process
if __name__ == "__main__":
    cleaned_dataset = main()

# Print instructions for further use
print("""
🎯 NEXT STEPS:

1. Inspect sample entries:
   inspect_sample_entries(cleaned_dataset, n=5)

2. Export by category:
   export_by_category(cleaned_dataset)

3. Further analysis:
   # Count non-null entries per column
   cleaned_dataset.count()

   # Check specific categories
   cleaned_dataset[cleaned_dataset['Category'] == 'Test Module']

4. Custom filtering:
   # Only entries with unit tests
   with_tests = cleaned_dataset[cleaned_dataset['Unit Test (.cpp file)'].notna()]

   # Only C/H file pairs
   pairs = cleaned_dataset[cleaned_dataset['Category'] == 'Source/Header Pair']

📝 The cleaned dataset is now ready for machine learning, analysis, or further processing!
""")