import io
import os
import shutil
import json
import pandas as pd
import base64

import kaggle_evaluation.konwinski_prize_inference_server

# Initialize global variables
instance_count = None
first_prediction = True
repo_dataframe = None  # Global variable to store the DataFrame

def get_number_of_instances(num_instances: int) -> None:
    """
    The very first message from the gateway will be the total number of instances to be served.
    You don't need to edit this function.
    """
    global instance_count
    instance_count = num_instances

def predict(problem_statement: str, repo_archive: io.BytesIO) -> str:
    """
    Inference function to read the repository archive into a pandas DataFrame.

    Args:
        problem_statement: The text of the git issue.
        repo_archive: A BytesIO buffer containing a .tar archive of the codebase.

    Returns:
        A JSON string representing the DataFrame with repository contents.
    """
    global first_prediction, repo_dataframe

    if not first_prediction:
        return None  # Skip processing if it's not the first prediction.

    try:
        # Step 1: Write the uploaded repository archive to a file
        archive_filename = 'repo_archive.tar'
        with open(archive_filename, 'wb') as f:
            f.write(repo_archive.read())
        print(f"Successfully wrote the archive to '{archive_filename}'.")

        # Step 2: Define the extraction directory
        repo_path = 'repo'

        # Step 3: Remove the extraction directory if it already exists to ensure a clean state
        if os.path.exists(repo_path):
            shutil.rmtree(repo_path)
            print(f"Removed existing directory '{repo_path}/'.")

        # Step 4: Extract the repository archive to the specified directory
        try:
            shutil.unpack_archive(archive_filename, extract_dir=repo_path)
            print(f"Successfully extracted the archive to '{repo_path}/'.")
        except shutil.ReadError as e:
            error_message = f"Error unpacking archive: {e}"
            print(error_message)
            return json.dumps({"error": error_message})

        # Step 5: Remove the archive file after extraction to save space
        os.remove(archive_filename)
        print(f"Removed the archive file '{archive_filename}'.")

        # Step 6: Initialize a list to hold repository data
        data = []

        # Step 7: Walk through the repository directory to read files
        for root, dirs, files in os.walk(repo_path):
            for file in files:
                file_full_path = os.path.join(root, file)  # Get the full file path
                relative_path = os.path.relpath(file_full_path, repo_path)
                try:
                    # Attempt to read the file content as text
                    with open(file_full_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                    # Append the file path, content, and is_binary flag to the data list
                    data.append({
                        'file_path': relative_path,
                        'content': content,
                        'is_binary': False
                    })
                except UnicodeDecodeError:
                    # If a UnicodeDecodeError occurs, treat the file as binary
                    try:
                        with open(file_full_path, 'rb') as f:
                            binary_content = f.read()
                        # Encode the binary content using Base64 to include in JSON
                        encoded_content = base64.b64encode(binary_content).decode('utf-8')
                        data.append({
                            'file_path': relative_path,
                            'content': encoded_content,
                            'is_binary': True
                        })
                        print(f"Encoded binary file '{file_full_path}'.")
                    except Exception as e:
                        # If reading as binary also fails, note the error
                        data.append({
                            'file_path': relative_path,
                            'content': f"Could not read file: {e}",
                            'is_binary': None
                        })
                        print(f"Could not read file '{file_full_path}': {e}")
                except Exception as e:
                    # Handle other exceptions
                    data.append({
                        'file_path': relative_path,
                        'content': f"Could not read file: {e}",
                        'is_binary': None
                    })
                    print(f"Could not read file '{file_full_path}': {e}")

        # Step 8: Create a pandas DataFrame from the collected data
        repo_df = pd.DataFrame(data)
        print("Successfully created the DataFrame from repository contents.")

        # Step 9: Convert the DataFrame to a JSON string
        repo_json = repo_df.to_json(orient='records', indent=2)
        print("Converted the DataFrame to JSON.")

        # Store the DataFrame in the global variable
        repo_dataframe = repo_df
        print("Stored the DataFrame in the global variable 'repo_dataframe'.")

        # Update the prediction flag
        first_prediction = False

        # Return the JSON string
        return repo_json

    except Exception as e:
        # Handle unexpected exceptions and return as JSON error
        error_response = {
            "error": str(e)
        }
        print(f"An unexpected error occurred: {e}")
        first_prediction = False
        return json.dumps(error_response)

# Initialize the inference server
inference_server = kaggle_evaluation.konwinski_prize_inference_server.KPrizeInferenceServer(
    get_number_of_instances,   
    predict
)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        data_paths=(
            '/kaggle/input/konwinski-prize/',  # Path to the entire competition dataset
            '/kaggle/tmp/konwinski-prize/',   # Path to a scratch directory for unpacking data.a_zip.
        )
    )

# After the inference server has processed the predictions
if repo_dataframe is not None:
    # Perform operations on the DataFrame
    print(repo_dataframe.head())
    # You can also save it to a file if needed
    repo_dataframe.to_csv('repository_contents.csv', index=False)
else:
    print("The DataFrame 'repo_dataframe' is not available.")


In [1]:
!pip install networkx plotly ray




In [2]:
import pandas as pd
import os 
import subprocess

splits = {'dev': 'data/dev-00000-of-00001.parquet', 'test': 'data/test-00000-of-00001.parquet', 'train': 'data/train-00000-of-00001.parquet'}
df = pd.read_parquet("hf://datasets/princeton-nlp/SWE-bench/" + splits["dev"])
df


Unnamed: 0,repo,instance_id,base_commit,patch,test_patch,problem_statement,hints_text,created_at,version,FAIL_TO_PASS,PASS_TO_PASS,environment_setup_commit
0,sqlfluff/sqlfluff,sqlfluff__sqlfluff-4764,a820c139ccbe6d1865d73c4a459945cd69899f8f,diff --git a/src/sqlfluff/cli/commands.py b/sr...,diff --git a/test/cli/commands_test.py b/test/...,Enable quiet mode/no-verbose in CLI for use in...,,2023-04-16T14:24:42Z,1.4,"[""test/cli/commands_test.py::test__cli__fix_mu...","[""test/cli/commands_test.py::test__cli__comman...",d19de0ecd16d298f9e3bfb91da122734c40c01e5
1,sqlfluff/sqlfluff,sqlfluff__sqlfluff-2862,447ecf862a4d2b977d0add9f444655357b9c4f1f,diff --git a/src/sqlfluff/core/linter/common.p...,diff --git a/test/api/simple_test.py b/test/ap...,fix keep adding new line on wrong place \n### ...,"> Version\r\n> sqlfluff, version 0.6.2\r\n\r\n...",2022-03-14T19:46:08Z,0.10,"[""test/api/simple_test.py::test__api__lint_str...","[""test/api/simple_test.py::test__api__lint_str...",3d52e8270d82aeccf4c516d059a80a6947919aea
2,sqlfluff/sqlfluff,sqlfluff__sqlfluff-2336,37a993f7ad841ab3035d1db5ce6525f2e5584fd5,diff --git a/src/sqlfluff/core/rules/analysis/...,diff --git a/test/core/rules/reference_test.py...,L026: Rule incorrectly flag column does not ex...,,2022-01-17T21:35:10Z,0.8,"[""test/core/rules/reference_test.py::test_obje...",[],a5c4eae4e3e419fe95460c9afd9cf39a35a470c4
3,sqlfluff/sqlfluff,sqlfluff__sqlfluff-5074,7b7fd603a19755a9f3707ebbf95d18ee635716d8,diff --git a/src/sqlfluff/core/errors.py b/src...,diff --git a/test/cli/commands_test.py b/test/...,Inconsistent output depending on --processes f...,This is _very_ interesting! I'll pick this one...,2023-08-08T23:31:59Z,2.1,"[""test/cli/commands_test.py::test__cli__comman...","[""test/cli/commands_test.py::test__cli__comman...",7b7fd603a19755a9f3707ebbf95d18ee635716d8
4,sqlfluff/sqlfluff,sqlfluff__sqlfluff-3436,23cd31e77a712a210c734e38488d7a34afd83a25,diff --git a/src/sqlfluff/core/templaters/slic...,diff --git a/test/core/templaters/jinja_test.p...,Fatal templating error with Jinja templater. T...,I'll take a look.\r\n\r\nAnd darn it -- first ...,2022-06-07T21:36:59Z,0.13,"[""test/core/templaters/jinja_test.py::test__te...","[""test/core/templaters/jinja_test.py::test__te...",6e8ce43a4958dbaa56256365c2a89d8db92e07d6
...,...,...,...,...,...,...,...,...,...,...,...,...
220,pydicom/pydicom,pydicom__pydicom-809,356a51ab4bc54fd18950041ebc44dbfa1a425a10,diff --git a/pydicom/dataset.py b/pydicom/data...,diff --git a/pydicom/tests/test_filewriter.py ...,"""Printing"" of certain dicom files fails once, ...",Occurs because Pixel Representation is in the ...,2019-03-04T20:14:54Z,1.2,"[""pydicom/tests/test_filewriter.py::TestCorrec...","[""pydicom/tests/test_filewriter.py::TestWriteF...",b4b44acbf1ddcaf03df16210aac46cb3a8acd6b9
221,pydicom/pydicom,pydicom__pydicom-933,38436b6824c079564b8760ea6acfa4c0fd3ee9c3,diff --git a/pydicom/dataset.py b/pydicom/data...,diff --git a/pydicom/tests/test_filereader.py ...,Deferred Read Fails For File-Like Objects\n###...,"This certainly makes sense, though deferred re...",2019-08-15T20:21:09Z,1.3,"[""pydicom/tests/test_filereader.py::TestDeferr...","[""pydicom/tests/test_filereader.py::TestReader...",7241f5d9db0de589b230bb84212fbb643a7c86c3
222,pydicom/pydicom,pydicom__pydicom-1633,98ac88706e7ab17cd279c94949ac6af4e87f341d,diff --git a/pydicom/valuerep.py b/pydicom/val...,diff --git a/pydicom/tests/test_valuerep.py b/...,"OverflowError ""VR of 'DS' must be <= 16 charac...","For reference, a possibly similar issue came u...",2022-04-14T18:26:56Z,2.3,"[""pydicom/tests/test_valuerep.py::TestDSfloat:...","[""pydicom/tests/test_valuerep.py::TestTM::test...",a8be738418dee0a2b93c241fbd5e0bc82f4b8680
223,pydicom/pydicom,pydicom__pydicom-1428,674da68db47a71ee6929288a047b56cf31cf8168,diff --git a/pydicom/fileset.py b/pydicom/file...,diff --git a/pydicom/tests/test_fileset.py b/p...,Allow to search a list of elements in a `FileS...,"Sounds good, do you want to do the PR? Just ch...",2021-06-28T08:57:19Z,2.1,"[""pydicom/tests/test_fileset.py::TestFileSet_L...","[""pydicom/tests/test_fileset.py::test_is_confo...",506ecea8f378dc687d5c504788fc78810a190b7a


In [3]:
import os
import subprocess
import pandas as pd
import tempfile


def is_binary_file(filepath):
    """Utility function to detect if a file is binary."""
    with open(filepath, 'rb') as file:
        chunk = file.read(1024)
        return b'\0' in chunk


def analyze_repo_contents(df, index):
    """
    Analyze repository contents before and after a patch and load them into separate DataFrames,
    including an 'is_binary' field.
    
    Args:
        df (pd.DataFrame): DataFrame containing SWE-bench data.
        index (int): Index of the row to analyze.

    Returns:
        pd.DataFrame, pd.DataFrame: DataFrames for repository contents before and after the patch.
    """
    if index >= len(df):
        raise IndexError("The provided index is out of range.")

    # Extract relevant row
    row = df.iloc[index]

    # Extract repository information
    repo_url = f"https://github.com/{row['repo']}.git"
    repo_name = row['repo'].split('/')[-1]
    repo_path = f"./{repo_name}"

    # Clone the repository if not already cloned
    if not os.path.exists(repo_path):
        print(f"Cloning repository {repo_url}...")
        subprocess.run(["git", "clone", repo_url, repo_path], check=True)
    else:
        print(f"Repository {repo_name} already cloned.")

    # Checkout the base commit (before the patch)
    base_commit = row['base_commit']
    subprocess.run(["git", "checkout", base_commit], cwd=repo_path, check=True)

    # Load repository contents before the patch
    print(f"Loading repository contents at base commit {base_commit}...")
    pre_patch_files = []
    for root, dirs, files in os.walk(repo_path):
        for file in files:
            file_path = os.path.join(root, file)
            is_binary = is_binary_file(file_path)
            if is_binary:
                with open(file_path, "rb") as f:  # Binary mode
                    content = f.read()
            else:
                with open(file_path, "r", encoding="utf-8", errors="ignore") as f:  # Text mode
                    content = f.read()
            pre_patch_files.append({"file_path": file_path, "content": content, "is_binary": is_binary})

    pre_patch_df = pd.DataFrame(pre_patch_files)

    # Save and attempt to apply the patch
    patch = row['patch']
    with tempfile.NamedTemporaryFile(delete=False, suffix=".diff") as temp_patch_file:
        patch_file_path = temp_patch_file.name
        temp_patch_file.write(patch.encode("utf-8"))

    try:
        subprocess.run(["git", "apply", patch_file_path], cwd=repo_path, check=True)
        print("Patch applied successfully.")
    except subprocess.CalledProcessError as e:
        print("Patch application failed.")
        print(f"Error: {e}")
        print("Proceeding without applying the patch.")
    finally:
        os.remove(patch_file_path)

    # Load repository contents after attempting to apply the patch
    print("Loading repository contents after patch attempt...")
    post_patch_files = []
    for root, dirs, files in os.walk(repo_path):
        for file in files:
            file_path = os.path.join(root, file)
            is_binary = is_binary_file(file_path)
            if is_binary:
                with open(file_path, "rb") as f:  # Binary mode
                    content = f.read()
            else:
                with open(file_path, "r", encoding="utf-8", errors="ignore") as f:  # Text mode
                    content = f.read()
            post_patch_files.append({"file_path": file_path, "content": content, "is_binary": is_binary})

    post_patch_df = pd.DataFrame(post_patch_files)

    return pre_patch_df, post_patch_df


# Analyze the first row
pre_patch_df, post_patch_df = analyze_repo_contents(df, index=0)

# View DataFrames
print(pre_patch_df.head())
print(post_patch_df.head())


Repository sqlfluff already cloned.
M	src/sqlfluff/cli/commands.py
M	src/sqlfluff/cli/formatters.py
M	src/sqlfluff/core/linter/linted_dir.py
Loading repository contents at base commit a820c139ccbe6d1865d73c4a459945cd69899f8f...


HEAD is now at a820c139c Use the new CollationReferenceSegment everywhere (#4770)
error: patch failed: src/sqlfluff/cli/commands.py:44
error: src/sqlfluff/cli/commands.py: patch does not apply
error: patch failed: src/sqlfluff/cli/formatters.py:94
error: src/sqlfluff/cli/formatters.py: patch does not apply
error: patch failed: src/sqlfluff/core/linter/linted_dir.py:117
error: src/sqlfluff/core/linter/linted_dir.py: patch does not apply


Patch application failed.
Error: Command '['git', 'apply', '/tmp/tmp6h5jn9bm.diff']' returned non-zero exit status 1.
Proceeding without applying the patch.
Loading repository contents after patch attempt...
                   file_path  \
0  ./sqlfluff/.gitattributes   
1   ./sqlfluff/.editorconfig   
2     ./sqlfluff/MANIFEST.in   
3         ./sqlfluff/tox.ini   
4   ./sqlfluff/.dockerignore   

                                             content  is_binary  
0  # We'll let Git's auto-detection algorithm inf...      False  
1  # editorconfig.org\nroot = true\n\n[*]\nindent...      False  
2        include README.md LICENSE.md CHANGELOG.md\n      False  
3  [tox]\nenvlist = generate-fixture-yml, linting...      False  
4  # Ignore IDE files\n.vscode\n.idea\n/.sqlfluff...      False  
                   file_path  \
0  ./sqlfluff/.gitattributes   
1   ./sqlfluff/.editorconfig   
2     ./sqlfluff/MANIFEST.in   
3         ./sqlfluff/tox.ini   
4   ./sqlfluff/.dockerignore   

          

In [4]:
# Define mapping dictionaries
extension_category_map = {
    # Source Code
    '.py': 'Source Code Files',
    '.js': 'Source Code Files',
    '.java': 'Source Code Files',
    '.c': 'Source Code Files',
    '.cpp': 'Source Code Files',
    '.cs': 'Source Code Files',
    '.rb': 'Source Code Files',
    '.go': 'Source Code Files',
    '.ts': 'Source Code Files',
    '.php': 'Source Code Files',
    '.swift': 'Source Code Files',
    '.kt': 'Source Code Files',

    # Configuration
    '.cfg': 'Configuration Files',
    '.toml': 'Configuration Files',
    '.yaml': 'Configuration Files',
    '.yml': 'Configuration Files',
    '.json': 'Configuration Files',
    '.ini': 'Configuration Files',
    '.env': 'Configuration Files',
    '.editorconfig': 'Configuration Files',
    '.git-blame-ignore-revs': 'Version Control Configuration Files',
    'pyproject.toml': 'Configuration Files',

    # Documentation
    '.md': 'Documentation Files',
    '.rst': 'Documentation Files',
    '.txt': 'Documentation Files',
    '.adoc': 'Documentation Files',

    # License and Legal
    'LICENSE': 'License and Legal Files',
    'LICENSE.txt': 'License and Legal Files',
    'LICENSE.md': 'License and Legal Files',
    'NOTICE': 'License and Legal Files',

    # Scripts
    '.sh': 'Scripts and Utilities',
    '.bat': 'Scripts and Utilities',
    '.ps1': 'Scripts and Utilities',
    '.pyw': 'Scripts and Utilities',

    # Testing
    '.test': 'Testing Files',
    '.spec': 'Testing Files',
    '.pytest': 'Testing Files',
    'pytest.ini': 'Testing Files',
    'tox.ini': 'Testing Files',

    # Build and Deployment
    'Dockerfile': 'Build and Deployment Files',
    '.dockerignore': 'Build and Deployment Files',
    'Makefile': 'Build and Deployment Files',
    'docker-compose.yml': 'Build and Deployment Files',
    'Jenkinsfile': 'Build and Deployment Files',
    'build.gradle': 'Build and Deployment Files',
    'pom.xml': 'Build and Deployment Files',

    # Version Control Configuration
    '.gitignore': 'Version Control Configuration Files',
    '.gitattributes': 'Version Control Configuration Files',
    '.gitmodules': 'Version Control Configuration Files',

    # Workflow and CI
    '.travis.yml': 'Workflow and CI Files',
    '.circleci/config.yml': 'Workflow and CI Files',
    '.github/workflows/ci.yaml': 'Workflow and CI Files',
    '.github/workflows/release-tests.yml': 'Workflow and CI Files',
    '.github/workflows/release.yml': 'Workflow and CI Files',
    '.github/workflows/codeql-analysis.yml': 'Workflow and CI Files',
    '.github/workflows/backport.yml': 'Workflow and CI Files',
    # Add more special files as needed
}

# Directory patterns mapped to categories
directory_category_map = {
    'docs/': 'Documentation Files',
    'docs': 'Documentation Files',
    'test/': 'Testing Files',
    'tests/': 'Testing Files',
    'assets/': 'Binary and Asset Files',
    'static/': 'Binary and Asset Files',
    'scripts/': 'Scripts and Utilities',
    'bin/': 'Scripts and Utilities',
    'config/': 'Configuration Files',
    'src/': 'Source Code Files',
    'lib/': 'Source Code Files',
    'include/': 'Source Code Files',
    'examples/': 'Examples and Demos',
    'public/': 'Public Assets',
    # Add more directory patterns as needed
}

# Binary file extensions
binary_extensions = {
    '.png', '.jpg', '.jpeg', '.gif', '.svg', '.exe', '.dll', '.so',
    '.bin', '.ico', '.pdf', '.zip', '.tar', '.gz', '.7z', '.rar',
    '.mp3', '.mp4', '.avi', '.mov', '.wmv', '.flv', '.mkv', '.bmp',
    '.tiff', '.woff', '.woff2', '.ttf', '.eot', '.otf', '.dmg',
    '.iso', '.jar', '.war', '.ear'
}

# Function to classify files
def classify_file(row):
    file_path = row['file_path']
    is_binary = row['is_binary']
    
    # Normalize file_path for consistent matching
    normalized_path = file_path.lower()
    
    # Check for exact matches first (e.g., Dockerfile, LICENSE)
    if file_path in extension_category_map:
        return extension_category_map[file_path]
    
    # Check directory patterns
    for dir_pattern, category in directory_category_map.items():
        if normalized_path.startswith(dir_pattern):
            return category
    
    # Extract the file extension or specific filename
    basename = os.path.basename(file_path)
    _, ext = os.path.splitext(basename)
    ext = ext.lower()
    
    # Check exact filename matches if not already matched
    if basename in extension_category_map:
        return extension_category_map[basename]
    
    # Check extension-based category
    if ext in extension_category_map:
        return extension_category_map[ext]
    
    # Check for binary files based on extension
    if is_binary or ext in binary_extensions:
        return 'Binary and Asset Files'
    
    # Default category
    return 'Miscellaneous Files'

# Apply classification
pre_patch_df['Category'] = pre_patch_df.apply(classify_file, axis=1)
post_patch_df['Category'] = post_patch_df.apply(classify_file, axis=1)


In [5]:
pre_patch_df.columns

Index(['file_path', 'content', 'is_binary', 'Category'], dtype='object')

In [6]:
pre_patch_df.iloc[:1, :]

Unnamed: 0,file_path,content,is_binary,Category
0,./sqlfluff/.gitattributes,# We'll let Git's auto-detection algorithm inf...,False,Version Control Configuration Files


In [7]:
pre_patch_df['Category'].value_counts()

Category
Miscellaneous Files                    1582
Configuration Files                    1428
Source Code Files                       320
Documentation Files                      42
Binary and Asset Files                   13
Version Control Configuration Files       6
Build and Deployment Files                4
Testing Files                             4
License and Legal Files                   2
Scripts and Utilities                     1
Name: count, dtype: int64

In [8]:
post_patch_df['Category'].value_counts()

Category
Miscellaneous Files                    1582
Configuration Files                    1428
Source Code Files                       320
Documentation Files                      42
Binary and Asset Files                   13
Version Control Configuration Files       6
Build and Deployment Files                4
Testing Files                             4
License and Legal Files                   2
Scripts and Utilities                     1
Name: count, dtype: int64

**Codebase Deep Dive**

In [9]:
import torch

print("GPUs available:", torch.cuda.device_count())
for i in range(torch.cuda.device_count()):
    print(f"GPU {i}: {torch.cuda.get_device_name(i)}")


GPUs available: 2
GPU 0: Tesla T4
GPU 1: Tesla T4


In [12]:
import ray

ray.init(num_gpus=torch.cuda.device_count())


2024-12-30 19:12:42,009	INFO worker.py:1753 -- Started a local Ray instance.


0,1
Python version:,3.10.14
Ray version:,2.24.0


[36m(DistributedModelWorker pid=1665)[0m Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::DistributedModelWorker.__init__()[39m (pid=1665, ip=172.19.2.2, actor_id=58d00361ea79d9ee11bf6b3401000000, repr=<__main__.DistributedModelWorker object at 0x7bc235f85900>)
[36m(DistributedModelWorker pid=1665)[0m   File "/tmp/ipykernel_1009/2694572308.py", line 476, in __init__
[36m(DistributedModelWorker pid=1665)[0m   File "/tmp/ipykernel_1009/2694572308.py", line 288, in __init__
[36m(DistributedModelWorker pid=1665)[0m ValueError: Invalid GPU ID: 1


In [None]:
import os
import logging
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import pandas as pd
from tqdm import tqdm
import ray
from typing import List, Dict
import numpy as np
import sys

# --------------------------- Configuration --------------------------- #

# Model and Authentication
MODEL_DIR = "/kaggle/input/qwen2.5-coder/transformers/1.5b-instruct/1"  # Update this path as needed
HF_TOKEN = "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"  # Replace with your Hugging Face token

# Output Files
OUTPUT_FILE = "complete_codebase_documentation.md"  # Output Markdown file
LOG_FILE = "repo_inferencer.log"  # Log file

# Model Parameters
MAX_TOKENS = 4096  # Maximum tokens for model generation
CHUNK_SIZE = 2000  # Token-based chunk size to break the input

# Prompt Templates for Different Categories
PROMPT_TEMPLATES = {
    "Source Code": {
        "summary": """You are an expert software developer tasked with analyzing and documenting source code. Follow these steps to provide a comprehensive summary:

Step 1: Code Understanding
- Carefully read through the provided source code
- Identify the main purpose and functionality
- Note any important dependencies or imports

Step 2: Architecture Analysis
- Identify key classes, functions, and modules
- Understand the code structure and organization
- Note any design patterns or architectural choices

Step 3: Generate Summary
Provide a clear summary addressing:
1. Main purpose and functionality
2. Key components and their roles
3. Important dependencies
4. Notable design patterns or architectural decisions
5. Any performance considerations

File: {file_path}

Code:
{content}

Summary:""",
        "key_points": """As an expert developer, extract key technical points from this code by following these steps:

Step 1: Technical Analysis
- Review the code's technical implementation
- Identify core algorithms and data structures
- Note any important configurations or settings

Step 2: Implementation Details
- List critical functions and their purposes
- Document important class relationships
- Note any complex logic or algorithms

Step 3: Identify Key Technical Points
Focus on:
1. Critical functions and their roles
2. Important class hierarchies
3. Key algorithms and data structures
4. Configuration parameters
5. Error handling mechanisms
6. Performance optimizations

File: {file_path}

Code:
{content}

Key Technical Points:"""
    },
    "Configuration": {
        "summary": """As a configuration specialist, analyze this configuration file following these steps:

Step 1: Configuration Analysis
- Identify the configuration type and format
- Understand the scope and purpose
- Note any environment-specific settings

Step 2: Settings Review
- Review all configuration parameters
- Identify critical settings
- Note any security-related configurations

Step 3: Generate Summary
Address:
1. Configuration file purpose
2. Scope and environment context
3. Critical settings and their impacts
4. Security considerations
5. Integration points

File: {file_path}

Configuration:
{content}

Summary:""",
        "key_points": """As a configuration expert, extract key configuration points following these steps:

Step 1: Parameter Analysis
- Identify all important parameters
- Understand their purposes and impacts
- Note any dependencies between settings

Step 2: Critical Settings
- List mission-critical configurations
- Document default values and their implications
- Note any security-sensitive settings

Step 3: Generate Key Points
Focus on:
1. Essential configuration parameters
2. Environment-specific settings
3. Security configurations
4. Integration parameters
5. Performance-related settings

File: {file_path}

Configuration:
{content}

Key Configuration Points:"""
    },
    "Documentation": {
        "summary": """As a technical documentation expert, analyze this documentation following these steps:

Step 1: Content Analysis
- Understand the documentation scope
- Identify main topics covered
- Note any important guidelines or requirements

Step 2: Documentation Review
- Evaluate completeness and clarity
- Identify key information sections
- Note any technical specifications

Step 3: Generate Summary
Address:
1. Documentation purpose and scope
2. Main topics covered
3. Key guidelines or requirements
4. Technical specifications
5. Important usage examples

File: {file_path}

Documentation:
{content}

Summary:""",
        "key_points": """As a documentation specialist, extract key documentation points following these steps:

Step 1: Content Review
- Identify critical information
- Note important guidelines
- List key examples or demonstrations

Step 2: Technical Details
- Extract technical specifications
- Note implementation requirements
- List important references

Step 3: Generate Key Points
Focus on:
1. Critical guidelines
2. Technical requirements
3. Important examples
4. Best practices
5. Key references

File: {file_path}

Documentation:
{content}

Key Documentation Points:"""
    },
    "Build": {
        "summary": """As a build system expert, analyze this build configuration following these steps:

Step 1: Build Process Analysis
- Understand build steps and dependencies
- Identify build targets and artifacts
- Note any special build requirements

Step 2: Configuration Review
- Review build settings and parameters
- Identify critical dependencies
- Note any platform-specific configurations

Step 3: Generate Summary
Address:
1. Build process overview
2. Key targets and artifacts
3. Critical dependencies
4. Platform requirements
5. Build optimization settings

File: {file_path}

Build Configuration:
{content}

Summary:""",
        "key_points": """As a build system specialist, extract key build points following these steps:

Step 1: Build Configuration Analysis
- Identify critical build steps
- List important dependencies
- Note build optimization settings

Step 2: Platform Requirements
- Document platform-specific settings
- Note compatibility requirements
- List required tools and versions

Step 3: Generate Key Points
Focus on:
1. Critical build steps
2. Essential dependencies
3. Platform requirements
4. Build optimizations
5. Tool requirements

File: {file_path}

Build Configuration:
{content}

Key Build Points:"""
    }
}

# --------------------------- Logging Setup --------------------------- #

logging.basicConfig(
    filename=LOG_FILE,
    filemode='a',
    format='%(asctime)s - %(levelname)s - %(message)s',
    level=logging.INFO
)

# --------------------------- Model Worker Classes --------------------------- #

class ModelWorker:
    """
    Handles model loading and text generation for file processing.
    Compatible with Ray for distributed processing.
    """

    def __init__(self, model_dir: str, hf_token: str):
        """
        Initialize the ModelWorker with the allocated GPU.

        Args:
            model_dir (str): Path to the pre-trained model.
            hf_token (str): Hugging Face authentication token.

        Raises:
            ValueError: If no GPUs are allocated or GPU ID is invalid.
            Exception: For any other initialization errors.
        """
        try:
            # Retrieve the GPU IDs allocated by Ray
            allocated_gpus = ray.get_gpu_ids()
            if not allocated_gpus:
                logging.error("No GPUs allocated to this worker. Exiting.")
                raise ValueError("No GPUs allocated to this worker.")

            gpu_id = allocated_gpus[0]

            # Validate GPU ID
            if gpu_id >= torch.cuda.device_count():
                logging.error(f"Allocated GPU ID {gpu_id} is invalid. Available GPUs: 0 to {torch.cuda.device_count()-1}")
                raise ValueError(f"Invalid GPU ID: {gpu_id}")

            self.device = f'cuda:{gpu_id}'
            logging.info(f"ModelWorker initialized on GPU: {gpu_id}")

            # Initialize the tokenizer with updated parameter
            self.tokenizer = AutoTokenizer.from_pretrained(
                model_dir,
                token=hf_token,  # Replaced 'use_auth_token' with 'token'
                trust_remote_code=True
            )
            self._init_model(model_dir, hf_token)
        except Exception as e:
            logging.error(f"Error initializing ModelWorker: {e}")
            raise

    def _init_model(self, model_dir: str, hf_token: str):
        """
        Load the model onto the specified GPU.

        Args:
            model_dir (str): Path to the pre-trained model.
            hf_token (str): Hugging Face authentication token.

        Raises:
            Exception: If model loading fails.
        """
        try:
            self.model = AutoModelForCausalLM.from_pretrained(
                model_dir,
                trust_remote_code=True,
                torch_dtype=torch.bfloat16 if self.device.startswith('cuda') else torch.float32,
                token=hf_token  # Replaced 'use_auth_token' with 'token'
            ).to(self.device)
            self.model.eval()
            logging.info("Model loaded successfully.")
        except Exception as e:
            logging.error(f"Error loading model: {e}")
            raise

    def generate_text(self, prompt: str, chunk_size: int = CHUNK_SIZE) -> str:
        """
        Generate text based on the provided prompt, handling chunking if necessary.

        Args:
            prompt (str): The input prompt for the model.
            chunk_size (int): The maximum number of tokens per chunk.

        Returns:
            str: The generated text or an error message.
        """
        try:
            # Tokenize the prompt to get the token count
            inputs = self.tokenizer(prompt, return_tensors="pt", truncation=False)
            token_count = len(inputs['input_ids'][0])

            if token_count <= MAX_TOKENS:
                return self._generate(prompt)

            # If the prompt exceeds the max token limit, chunk it
            return self._generate_chunked(prompt, chunk_size)
        except Exception as e:
            logging.error(f"Error generating text: {e}")
            return f"Error generating text: {str(e)}"

    def _generate_chunked(self, prompt: str, chunk_size: int) -> str:
        """
        Generate text for chunked prompts and combine the results.

        Args:
            prompt (str): The input prompt exceeding MAX_TOKENS.
            chunk_size (int): The maximum number of tokens per chunk.

        Returns:
            str: The combined and refined generated text.
        """
        try:
            chunks = []
            remaining_text = prompt
            while remaining_text:
                # Tokenize and check the size
                inputs = self.tokenizer(remaining_text, return_tensors="pt", truncation=False)
                token_count = len(inputs['input_ids'][0])

                if token_count > chunk_size:
                    # Find the approximate split point
                    split_point = chunk_size
                    chunk_ids = inputs['input_ids'][0][:split_point]
                    chunk = self.tokenizer.decode(chunk_ids, skip_special_tokens=True)
                    remaining_text = self.tokenizer.decode(inputs['input_ids'][0][split_point:], skip_special_tokens=True)
                else:
                    chunk = remaining_text
                    remaining_text = ""

                # Generate the output for each chunk
                chunk_output = self._generate(chunk)
                chunks.append(chunk_output)

            # Combine the chunked outputs and refine the final output
            combined = " ".join(chunks)
            refined_output = self._generate(f"Please provide a coherent final version of this analysis:\n{combined}")
            return refined_output
        except Exception as e:
            logging.error(f"Error during chunked text generation: {e}")
            return f"Error generating text: {str(e)}"

    def _generate(self, prompt: str) -> str:
        """
        Generate text using the model based on the prompt.

        Args:
            prompt (str): The input prompt for the model.

        Returns:
            str: The generated text or an error message.
        """
        try:
            inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(self.device)
            attention_mask = inputs['attention_mask']  # Explicitly retrieve attention mask

            with torch.no_grad():
                output_ids = self.model.generate(
                    inputs.input_ids,
                    attention_mask=attention_mask,  # Pass attention mask
                    max_length=MAX_TOKENS,
                    num_return_sequences=1,
                    temperature=0.7,
                    do_sample=True,
                    pad_token_id=self.tokenizer.eos_token_id
                )

            return self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
        except Exception as e:
            logging.error(f"Error during text generation: {e}")
            return f"Error generating text: {str(e)}"

    def process_file(self, file_data: Dict) -> Dict:
        """
        Process a single file to generate summary and key points.

        Args:
            file_data (Dict): A dictionary containing file information.

        Returns:
            Dict: A dictionary with the generated summary and key points.
        """
        try:
            file_path = file_data['file_path']
            content = file_data['content']
            category = file_data.get('Category', 'Unknown')

            template = PROMPT_TEMPLATES.get(category, PROMPT_TEMPLATES["Documentation"])

            summary_prompt = template["summary"].format(
                file_path=file_path,
                content=content
            )
            summary = self.generate_text(summary_prompt)

            key_points_prompt = template["key_points"].format(
                file_path=file_path,
                content=content
            )
            key_points = self.generate_text(key_points_prompt)

            return {
                'Category': category,
                'file_path': file_path,
                'summary': summary,
                'key_points': key_points
            }
        except Exception as e:
            logging.error(f"Error processing file {file_data.get('file_path', 'Unknown')}: {e}")
            return {
                'Category': file_data.get('Category', 'Unknown'),
                'file_path': file_data.get('file_path', 'Unknown'),
                'summary': f"Error during processing: {str(e)}",
                'key_points': "Processing failed"
            }

# Ray-Compatible Distributed Model Worker
@ray.remote(num_gpus=1)
class DistributedModelWorker(ModelWorker):
    """
    Ray remote actor for distributed model processing.
    Inherits from ModelWorker and initializes the model within Ray's context.
    """
    def __init__(self, model_dir: str, hf_token: str):
        super().__init__(model_dir, hf_token)

# --------------------------- Ray Initialization --------------------------- #

def init_ray_cluster(required_gpus: int):
    """
    Initialize Ray cluster with the required number of GPUs.

    Args:
        required_gpus (int): The number of GPUs needed.

    Raises:
        RuntimeError: If the available GPUs are less than required.
    """
    if not ray.is_initialized():
        try:
            # Check available GPUs on the system
            total_gpus = torch.cuda.device_count()
            if total_gpus < required_gpus:
                logging.error(f"Requested {required_gpus} GPUs, but only {total_gpus} available.")
                raise RuntimeError(f"Insufficient GPUs: Requested {required_gpus}, Available {total_gpus}")

            # Initialize Ray with 0 GPUs for the driver to reserve all GPUs for actors
            ray.init(ignore_reinit_error=True, num_gpus=0)
            logging.info(f"Ray cluster initialized with {required_gpus} GPUs reserved for actors.")
        except Exception as e:
            logging.error(f"Error initializing Ray cluster: {e}")
            raise
    else:
        logging.info("Ray cluster is already initialized.")

# --------------------------- Documentation Generation --------------------------- #

def generate_documentation(results_df: pd.DataFrame) -> str:
    """
    Generate comprehensive Markdown documentation from the results DataFrame.

    Args:
        results_df (pd.DataFrame): DataFrame containing processing results.

    Returns:
        str: The generated Markdown documentation.
    """
    doc = "# Comprehensive Codebase Documentation\n\n"

    for category in sorted(results_df['Category'].unique()):
        doc += f"## {category}\n\n"
        category_files = results_df[results_df['Category'] == category]

        for _, row in category_files.iterrows():
            doc += f"### {row['file_path']}\n\n"
            doc += "#### Summary\n\n"
            doc += f"{row['summary']}\n\n"
            doc += "#### Key Technical Points\n\n"
            doc += f"{row['key_points']}\n\n"
            doc += "---\n\n"

    return doc

# --------------------------- Codebase Analysis --------------------------- #

def analyze_codebase(repo_df: pd.DataFrame) -> pd.DataFrame:
    """
    Analyze the codebase by processing each file and generating summaries.
    Utilizes Ray for distributed GPU processing.

    Args:
        repo_df (pd.DataFrame): DataFrame containing files to process.

    Returns:
        pd.DataFrame: DataFrame with processing results.
    """
    total_gpus = torch.cuda.device_count()
    if total_gpus == 0:
        logging.error("No GPUs available for processing. Exiting.")
        raise RuntimeError("No GPUs available for processing.")

    required_gpus = total_gpus
    init_ray_cluster(required_gpus)

    try:
        # Initialize Ray workers based on available GPUs
        workers = [
            DistributedModelWorker.remote(MODEL_DIR, HF_TOKEN)
            for _ in range(required_gpus)
        ]
        logging.info(f"Initialized {len(workers)} Ray workers.")

        files = repo_df.to_dict('records')
        num_files = len(files)
        logging.info(f"Total files to process: {num_files}")

        # Distribute files to workers in a round-robin fashion
        futures = []
        for idx, file_data in enumerate(files):
            worker = workers[idx % len(workers)]
            futures.append(worker.process_file.remote(file_data))

        logging.info("Dispatching file processing tasks to Ray workers.")
        results = ray.get(futures)
        logging.info("Completed file processing with Ray workers.")

        return pd.DataFrame(results)

    except Exception as e:
        logging.error(f"Error during GPU processing with Ray: {e}")
        raise

# --------------------------- Main Function --------------------------- #

def main(repo_df: pd.DataFrame) -> str:
    """
    Main function to orchestrate codebase analysis and documentation generation.

    Args:
        repo_df (pd.DataFrame): DataFrame containing files to process.

    Returns:
        str: The generated Markdown documentation.
    """
    try:
        # Preprocessing the DataFrame: Filter out binary files
        repo_df = repo_df[repo_df['is_binary'] == False].reset_index(drop=True)

        if repo_df.empty:
            logging.info("No valid files to process.")
            return "No valid files to process."

        # Analyze the codebase
        results_df = analyze_codebase(repo_df)

        # Validate 'Category' column
        if 'Category' not in results_df.columns:
            logging.error("'Category' column missing in results DataFrame.")
            raise KeyError("'Category' column is missing in the results.")

        # Generate documentation
        documentation = generate_documentation(results_df)

        # Write to output file
        with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
            f.write(documentation)

        logging.info(f"Documentation generated successfully: {OUTPUT_FILE}")
        return documentation

    except KeyError as ke:
        logging.error(f"DataFrame KeyError: {ke}")
        sys.exit(f"Error: {str(ke)}")
    except Exception as e:
        logging.error(f"Error in documentation generation: {e}")
        sys.exit(f"An error occurred: {str(e)}")
    finally:
        if ray.is_initialized():
            ray.shutdown()
            logging.info("Ray cluster shutdown.")

# --------------------------- Execution Entry Point --------------------------- #

if __name__ == "__main__":
    

    # --------------------------- Execute Main Function --------------------------- #
    try:
        documentation = main(pre_patch_df.iloc[:20, :])
        print("Documentation Generation Completed Successfully.")
        print(f"Documentation saved to: {OUTPUT_FILE}")
    except Exception as e:
        logging.error(f"Unhandled exception: {e}")
        print(f"An error occurred: {str(e)}")


2024-12-30 19:16:36,799	INFO worker.py:1753 -- Started a local Ray instance.


[36m(autoscaler +6m7s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
[33m(autoscaler +6m7s)[0m Error: No available node types can fulfill resource request {'CPU': 1.0, 'GPU': 1.0}. Add suitable node types to this cluster to resolve this issue.


In [None]:
pre_patch_df

In [None]:
# Check for source code files in the dataframe
source_code_extensions = ['.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.c', '.cpp', '.h', '.hpp', '.rb', '.go', '.sh']
source_code_df = pre_patch_df[pre_patch_df['file_path'].str.endswith(tuple(source_code_extensions))]
print(f"Number of source code files: {len(source_code_df)}")
print(source_code_df[['file_path', 'Category']])


In [None]:
repo_dataframe['file_path'] = repo_dataframe['file_path'].apply(os.path.normpath)


Knowledge Graph

In [None]:
import pandas as pd
import networkx as nx
import os
import re
import logging
from functools import lru_cache
from typing import List
import json
import ast
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.patches import Patch
import numpy as np

# Configure logging to DEBUG for detailed output
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:%(message)s')

class KnowledgeGraph:
    def __init__(self, dataframe: pd.DataFrame):
        self.df = dataframe.copy()
        # Normalize all file paths to ensure consistency
        self.df['file_path'] = self.df['file_path'].apply(os.path.normpath)
        self.graph = nx.DiGraph()
        self.required_columns = {'file_path', 'content', 'is_binary', 'Category'}
        self.validate_dataframe()
        self.add_nodes()
        self.cache_file_content()
    
    def validate_dataframe(self):
        if not self.required_columns.issubset(self.df.columns):
            missing = self.required_columns - set(self.df.columns)
            logging.error(f"Dataframe is missing required columns: {missing}")
            raise ValueError(f"Dataframe must contain columns: {self.required_columns}")
        logging.info("Dataframe loaded successfully with all required columns.")
    
    def add_nodes(self):
        for _, row in self.df.iterrows():
            try:
                self.graph.add_node(row['file_path'], category=row['Category'])
                logging.debug(f"Added node: {row['file_path']} with category: {row['Category']}")
            except Exception as e:
                logging.error(f"Error adding node for file {row['file_path']}: {e}")
    
    @lru_cache(maxsize=None)
    def get_file_content(self, file_path: str) -> str:
        try:
            content = self.df.loc[self.df['file_path'] == file_path, 'content'].values[0]
            return content
        except IndexError:
            logging.warning(f"File content not found for {file_path}.")
            return ""
    
    def extract_dependencies(self, file_path: str, content: str, category: str) -> List[str]:
        dependencies = []
        try:
            if category == 'Source Code Files':
                if file_path.endswith('.py'):
                    dependencies.extend(self.extract_python_dependencies(file_path, content))
                elif file_path.endswith('.js') or file_path.endswith('.jsx'):
                    dependencies.extend(self.extract_javascript_dependencies(file_path, content))
                elif file_path.endswith('.ts') or file_path.endswith('.tsx'):
                    dependencies.extend(self.extract_typescript_dependencies(file_path, content))
                elif file_path.endswith('.java'):
                    dependencies.extend(self.extract_java_dependencies(file_path, content))
                elif file_path.endswith(('.cpp', '.c', '.hpp', '.h')):
                    dependencies.extend(self.extract_cpp_dependencies(file_path, content))
                elif file_path.endswith('.rb'):
                    dependencies.extend(self.extract_ruby_dependencies(file_path, content))
                elif file_path.endswith('.go'):
                    dependencies.extend(self.extract_go_dependencies(file_path, content))
                # Add more languages as needed
            elif category == 'Testing Files':
                if file_path.endswith('.py'):
                    dependencies.extend(self.extract_python_dependencies(file_path, content))
                elif file_path.endswith('.js') or file_path.endswith('.jsx'):
                    dependencies.extend(self.extract_javascript_dependencies(file_path, content))
                # Add more languages as needed
            elif category == 'Scripts and Utilities':
                dependencies.extend(self.extract_shell_dependencies(file_path, content))
            elif category == 'Documentation Files':
                dependencies.extend(self.extract_markdown_assets(file_path, content))
            elif category == 'Configuration Files':
                dependencies.extend(self.extract_config_dependencies(file_path, content))
            elif category == 'Workflow and CI Files':
                dependencies.extend(self.extract_workflow_dependencies(file_path, content))
            # Add more categories and their extraction functions as needed
        except Exception as e:
            logging.error(f"Error extracting dependencies from {file_path}: {e}")
        return dependencies
    
    # Dependency extraction methods for various languages and file types
    
    def extract_python_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            tree = ast.parse(content)
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        module = alias.name.split('.')[0]
                        dep_file = os.path.normpath(os.path.join(os.path.dirname(file_path), f"{module}.py"))
                        if dep_file in self.df['file_path'].values:
                            dependencies.append(dep_file)
                            logging.debug(f"Python dependency found: {dep_file} imported in {file_path}")
                elif isinstance(node, ast.ImportFrom):
                    module = node.module.split('.')[0] if node.module else ''
                    if module:
                        dep_file = os.path.normpath(os.path.join(os.path.dirname(file_path), f"{module}.py"))
                        if dep_file in self.df['file_path'].values:
                            dependencies.append(dep_file)
                            logging.debug(f"Python dependency found: {dep_file} imported in {file_path}")
        except SyntaxError as se:
            logging.warning(f"Syntax error while parsing {file_path}: {se}")
        except Exception as e:
            logging.error(f"Unexpected error while parsing {file_path}: {e}")
        return dependencies
    
    def extract_javascript_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # ES6 import statements: import something from './module.js'
            pattern = r'import\s+.*\s+from\s+[\'"](.+?\.js)[\'"]'
            matches = re.findall(pattern, content)
            for match in matches:
                dep_path = os.path.normpath(os.path.join(os.path.dirname(file_path), match))
                if dep_path in self.df['file_path'].values:
                    dependencies.append(dep_path)
                    logging.debug(f"JavaScript dependency found: {dep_path} imported in {file_path}")
            # CommonJS require statements: const module = require('./module.js')
            pattern_cjs = r'require\([\'"](.+?\.js)[\'"]\)'
            matches_cjs = re.findall(pattern_cjs, content)
            for match in matches_cjs:
                dep_path = os.path.normpath(os.path.join(os.path.dirname(file_path), match))
                if dep_path in self.df['file_path'].values:
                    dependencies.append(dep_path)
                    logging.debug(f"JavaScript dependency found: {dep_path} required in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting JavaScript dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_typescript_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # ES6 import statements: import something from './module.ts'
            pattern = r'import\s+.*\s+from\s+[\'"](.+?\.ts)[\'"]'
            matches = re.findall(pattern, content)
            for match in matches:
                dep_path = os.path.normpath(os.path.join(os.path.dirname(file_path), match))
                if dep_path in self.df['file_path'].values:
                    dependencies.append(dep_path)
                    logging.debug(f"TypeScript dependency found: {dep_path} imported in {file_path}")
            # CommonJS require statements: const module = require('./module.ts')
            pattern_cjs = r'require\([\'"](.+?\.ts)[\'"]\)'
            matches_cjs = re.findall(pattern_cjs, content)
            for match in matches_cjs:
                dep_path = os.path.normpath(os.path.join(os.path.dirname(file_path), match))
                if dep_path in self.df['file_path'].values:
                    dependencies.append(dep_path)
                    logging.debug(f"TypeScript dependency found: {dep_path} required in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting TypeScript dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_java_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # Java import statements: import com.example.Module;
            pattern = r'import\s+([a-zA-Z0-9_.]+);'
            matches = re.findall(pattern, content)
            for match in matches:
                module = match.split('.')[-1]
                dep_file = os.path.normpath(os.path.join(os.path.dirname(file_path), f"{module}.java"))
                if dep_file in self.df['file_path'].values:
                    dependencies.append(dep_file)
                    logging.debug(f"Java dependency found: {dep_file} imported in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting Java dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_cpp_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # C/C++ include statements: #include "module.h"
            pattern = r'#include\s+[<"](.+?\.h)[>"]'
            matches = re.findall(pattern, content)
            for match in matches:
                dep_path = os.path.normpath(os.path.join(os.path.dirname(file_path), match))
                if dep_path in self.df['file_path'].values:
                    dependencies.append(dep_path)
                    logging.debug(f"C/C++ dependency found: {dep_path} included in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting C/C++ dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_ruby_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # Ruby require statements: require 'module'
            pattern = r'require\s+[\'"](.+?)[\'"]'
            matches = re.findall(pattern, content)
            for match in matches:
                dep_file = os.path.normpath(os.path.join(os.path.dirname(file_path), f"{match}.rb"))
                if dep_file in self.df['file_path'].values:
                    dependencies.append(dep_file)
                    logging.debug(f"Ruby dependency found: {dep_file} required in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting Ruby dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_go_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # Go import statements: import "github.com/user/module"
            pattern = r'import\s+"([^"]+)"'
            matches = re.findall(pattern, content)
            for match in matches:
                module = match.split('/')[-1]
                dep_file = os.path.normpath(os.path.join(os.path.dirname(file_path), f"{module}.go"))
                if dep_file in self.df['file_path'].values:
                    dependencies.append(dep_file)
                    logging.debug(f"Go dependency found: {dep_file} imported in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting Go dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_shell_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # Look for 'docker build' commands referencing Dockerfile
            if 'docker build' in content:
                dockerfile = os.path.normpath(os.path.join(os.path.dirname(file_path), 'Dockerfile'))
                if dockerfile in self.df['file_path'].values:
                    dependencies.append(dockerfile)
                    logging.debug(f"Shell dependency found: {dockerfile} referenced in {file_path}")
            # Sourcing other scripts: source scripts/helper.sh
            sourced_scripts = re.findall(r'source\s+(.+?\.sh)', content)
            for script in sourced_scripts:
                script_path = os.path.normpath(os.path.join(os.path.dirname(file_path), script))
                if script_path in self.df['file_path'].values:
                    dependencies.append(script_path)
                    logging.debug(f"Shell dependency found: {script_path} sourced in {file_path}")
            # Executing other scripts: ./scripts/setup.sh
            executed_scripts = re.findall(r'\./(.+?\.sh)', content)
            for script in executed_scripts:
                script_path = os.path.normpath(os.path.join(os.path.dirname(file_path), script))
                if script_path in self.df['file_path'].values:
                    dependencies.append(script_path)
                    logging.debug(f"Shell dependency found: {script_path} executed in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting shell dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_markdown_assets(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # Regex to find image links: ![Alt Text](assets/image.png)
            pattern = r'!\[.*?\]\((assets/[^)]+\.(png|jpg|jpeg|gif|svg))\)'
            matches = re.findall(pattern, content, re.IGNORECASE)
            for match in matches:
                asset_path = os.path.normpath(match[0])
                if asset_path in self.df['file_path'].values:
                    dependencies.append(asset_path)
                    logging.debug(f"Markdown asset found: {asset_path} referenced in {file_path}")
            # Regex to find other asset references, e.g., scripts or styles
            pattern_assets = r'\((assets/[^)]+)\)'
            matches_assets = re.findall(pattern_assets, content, re.IGNORECASE)
            for asset in matches_assets:
                asset_path = os.path.normpath(asset)
                if asset_path in self.df['file_path'].values:
                    dependencies.append(asset_path)
                    logging.debug(f"Markdown asset found: {asset_path} referenced in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting Markdown assets from {file_path}: {e}")
        return dependencies
    
    def extract_config_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # Parse JSON configuration files
            if file_path.endswith('.json'):
                try:
                    config = json.loads(content)
                    # Example: Look for file references in specific keys
                    # Modify based on actual config structure
                    # For demonstration, assume 'scripts' key contains script paths
                    scripts = config.get('scripts', {})
                    for script_path in scripts.values():
                        script_path = os.path.normpath(script_path)
                        if script_path in self.df['file_path'].values:
                            dependencies.append(script_path)
                            logging.debug(f"Config dependency found: {script_path} referenced in {file_path}")
                except json.JSONDecodeError:
                    logging.warning(f"JSON decode error in {file_path}")
            # Parse YAML configuration files
            elif file_path.endswith(('.yml', '.yaml')):
                try:
                    import yaml
                    config = yaml.safe_load(content)
                    # Example: Look for file references in specific keys
                    scripts = config.get('scripts', {})
                    for script_path in scripts.values():
                        script_path = os.path.normpath(script_path)
                        if script_path in self.df['file_path'].values:
                            dependencies.append(script_path)
                            logging.debug(f"Config dependency found: {script_path} referenced in {file_path}")
                except ImportError:
                    logging.error("PyYAML is not installed. Install it using 'pip install pyyaml'")
                except yaml.YAMLError:
                    logging.warning(f"YAML parse error in {file_path}")
            # Add more configuration file types as needed
        except Exception as e:
            logging.error(f"Error extracting configuration dependencies from {file_path}: {e}")
        return dependencies
    
    def extract_workflow_dependencies(self, file_path: str, content: str) -> List[str]:
        dependencies = []
        try:
            # Look for 'run: scripts/deploy.sh' or similar
            run_scripts = re.findall(r'run:\s*(?:bash\s+)?(.+?\.sh)', content)
            for script in run_scripts:
                script_path = os.path.normpath(os.path.join(os.path.dirname(file_path), script))
                if script_path in self.df['file_path'].values:
                    dependencies.append(script_path)
                    logging.debug(f"Workflow dependency found: {script_path} run in {file_path}")
            
            # Look for test scripts (e.g., pytest)
            test_scripts = re.findall(r'run:\s*pytest\s+(.+)', content)
            for test_script in test_scripts:
                # Assuming tests are in 'tests/' directory or similar
                test_file = os.path.normpath(os.path.join(os.path.dirname(file_path), test_script.strip()))
                if test_file in self.df['file_path'].values:
                    dependencies.append(test_file)
                    logging.debug(f"Workflow dependency found: {test_file} tested in {file_path}")
        except Exception as e:
            logging.error(f"Error extracting workflow dependencies from {file_path}: {e}")
        return dependencies
    
    def build_edges_sequential(self):
        """
        Builds edges in the graph based on dependencies sequentially.
        This method replaces the multiprocessing approach for easier debugging.
        """
        logging.info("Starting sequential edge building.")
        for idx, row in self.df.iterrows():
            file_path = row['file_path']
            content = row['content']
            category = row['Category']
            if row['is_binary']:
                logging.debug(f"Skipping binary file: {file_path}")
                continue  # Skip binary files
            dependencies = self.extract_dependencies(file_path, content, category)
            logging.debug(f"Dependencies for {file_path}: {dependencies}")
            for dep in dependencies:
                if dep in self.df['file_path'].values:
                    self.graph.add_edge(file_path, dep, relationship='DEPENDS_ON')
                    logging.debug(f"Added edge: {file_path} DEPENDS_ON {dep}")
                else:
                    logging.warning(f"Dependency {dep} for file {file_path} not found in dataframe.")
        logging.info("Completed sequential edge building.")
    
    def cache_file_content(self):
        """
        Caches file content to optimize repeated access.
        Currently implemented using lru_cache decorator on get_file_content.
        """
        # This method can be expanded if needed
        pass
    
    def get_graph_dataframes(self):
        """
        Converts the NetworkX graph into pandas DataFrames for nodes and edges.
        
        Returns:
            nodes_df (pd.DataFrame): DataFrame containing node information.
            edges_df (pd.DataFrame): DataFrame containing edge information.
        """
        # Extract nodes with attributes
        nodes_data = []
        for node, attrs in self.graph.nodes(data=True):
            node_entry = {'file_path': node}
            node_entry.update(attrs)
            nodes_data.append(node_entry)
        nodes_df = pd.DataFrame(nodes_data)
        
        # Extract edges with attributes
        edges_data = []
        for source, target, attrs in self.graph.edges(data=True):
            edge_entry = {
                'source': source,
                'target': target,
                'relationship': attrs.get('relationship', '')
            }
            edges_data.append(edge_entry)
        edges_df = pd.DataFrame(edges_data)
        
        return nodes_df, edges_df


# Initialize KnowledgeGraph with sample data
kg = KnowledgeGraph(repo_dataframe)

# Build edges sequentially
kg.build_edges_sequential()

# Convert graph to DataFrames
nodes_df, edges_df = kg.get_graph_dataframes()

# Display the DataFrames
print("Nodes DataFrame:")
print(nodes_df)
print("\nEdges DataFrame:")
print(edges_df)


In [None]:


def create_knowledge_graph(nodes_df, edges_df, figsize=(20, 16)):
    # Create a NetworkX graph
    G = nx.DiGraph()
    
    # Add nodes with category as an attribute
    for _, row in nodes_df.iterrows():
        G.add_node(row['file_path'], category=row['category'])
    
    # Add edges with relationship as an attribute
    for _, row in edges_df.iterrows():
        G.add_edge(row['source'], row['target'], relationship=row['relationship'])
    
    # Define a visually distinct color map for different categories
    categories = sorted(nodes_df['category'].unique())
    color_map = plt.cm.Set3(np.linspace(0, 1, len(categories)))
    category_colors = {category: color_map[i] for i, category in enumerate(categories)}
    
    # Assign colors to nodes based on their category
    node_colors = [category_colors[G.nodes[node]['category']] for node in G.nodes()]
    
    # Create figure and axis
    fig, ax = plt.subplots(figsize=figsize)
    
    # Use a force-directed layout with optimized parameters for spacing
    pos = nx.spring_layout(
        G,
        k=1.5/np.sqrt(len(G.nodes())),  # Optimal distance between nodes
        iterations=50,  # More iterations for better convergence
        seed=42  # For reproducibility
    )
    
    # Draw nodes with enhanced visibility
    nodes = nx.draw_networkx_nodes(
        G, pos,
        node_color=node_colors,
        node_size=2000,  # Larger nodes
        alpha=0.7,
        edgecolors='white',  # White border for better contrast
        linewidths=2
    )
    
    # Draw edges with improved styling
    edges = nx.draw_networkx_edges(
        G, pos,
        edge_color='gray',
        arrowsize=20,
        arrowstyle='->',
        width=2,
        alpha=0.6,
        connectionstyle='arc3,rad=0.2'  # Curved edges for better visibility
    )
    
    # Add labels with improved readability
    labels = nx.draw_networkx_labels(
        G, pos,
        font_size=10,
        font_weight='bold',
        font_family='sans-serif',
        bbox=dict(facecolor='white', edgecolor='none', alpha=0.7, pad=4.0)
    )
    
    # Create a custom legend
    legend_elements = [Patch(facecolor=color, label=cat, alpha=0.7)
                      for cat, color in category_colors.items()]
    ax.legend(
        handles=legend_elements,
        title='Categories',
        title_fontsize=12,
        fontsize=10,
        loc='center left',
        bbox_to_anchor=(1, 0.5),
        frameon=True,
        facecolor='white',
        edgecolor='gray'
    )
    
    # Add title and styling
    plt.title(
        "Knowledge Graph Visualization",
        pad=20,
        fontsize=16,
        fontweight='bold'
    )
    
    # Remove axes and add padding
    plt.axis('off')
    plt.tight_layout(pad=2.0)
    
    return fig, ax

# Example usage:
fig, ax = create_knowledge_graph(nodes_df, edges_df, figsize=(34, 30))
plt.show()


In [None]:
nodes_df

In [None]:
edges_df