# Clone Repos

In [1]:
import os
import re
import subprocess
from tqdm import tqdm
import xml.etree.ElementTree as ET
from typing import List, Dict
from collections import defaultdict
from git import Repo, BadName
from pathlib import Path
import json
import requests

In [2]:
repo_name = "zaproxy/zaproxy"

repo_url = "https://github.com/" + repo_name
destination_path = "./" + repo_name
repo_path = repo_name

In [3]:
Repo.clone_from(repo_url, destination_path)

GitCommandError: Cmd('git') failed due to: exit code(128)
  cmdline: git clone -v -- https://github.com/zaproxy/zaproxy ./zaproxy/zaproxy
  stderr: 'fatal: destination path './zaproxy/zaproxy' already exists and is not an empty directory.
'

In [4]:
repo = Repo(destination_path)

In [5]:
def find_all_files(repo_path):
    pom_files = []
    for root, dirs, files in os.walk(repo_path):
        for file in files:
            if file == "pom.xml" or file == "requirements.txt" or file =="package.json":
                full_path = os.path.join(root, file)
                pom_files.append(full_path)
    return pom_files

In [6]:
def find_commits_with_changes(repo_path, file_paths):
    repo = Repo(repo_path)
    commits_with_changes = {}

    for file_path in file_paths:
        rel_path = os.path.relpath(file_path, repo_path)
        commits = list(repo.iter_commits(paths=rel_path))
        
        commits_with_changes[rel_path] = [commit.hexsha for commit in commits]
    
    return commits_with_changes

In [7]:
# Example usage
repo_path = "./" + repo_name  # Replace with your cloned repo path
file_paths = find_all_files(repo_path)

# Display all found pom.xml paths
for path in file_paths:
    path = path.replace('\\', '/')
    print(path)

print(len(file_paths))

./zaproxy/zaproxy/docker/requirements.txt
1


In [8]:
def get_all_relevant_commits(repo_path: str, file_paths: List[str]) -> Dict[str, List[str]]:
    repo = Repo(repo_path)
    commits_with_changes = {}

    for file_path in file_paths:
        rel_path = os.path.relpath(file_path, repo_path)

        # Get all commits that modified the file
        result = subprocess.run(
            ["git", "-C", repo_path, "log", "--follow", "--pretty=format:%H", "--name-only", "--", rel_path],
            capture_output=True,
            text=True,
            check=True
        )

        lines = result.stdout.strip().split('\n')
        current_commit = None

        for line in lines:
            stripped_line = line.strip()
            
            if len(stripped_line) == 40 and all(c in '0123456789abcdef' for c in stripped_line):  # Check if it looks like a SHA-1 hash
                current_commit = stripped_line
                if current_commit not in commits_with_changes:
                    commits_with_changes[current_commit] = set()  # Using set to avoid duplicates
            elif current_commit:
                modified_file = stripped_line
                if modified_file:  # Ensure we don't add empty lines
                    commits_with_changes[current_commit].add(modified_file)

    # Convert commits to a sorted list based on commit date
    commit_objects = []
    for commit_hash in commits_with_changes.keys():
        try:
            commit_obj = repo.commit(commit_hash)
            commit_objects.append((commit_obj, commit_hash))
        except BadName:
            print(f"Skipping invalid commit hash: {commit_hash}")  # Warn if a bad commit hash is found

    sorted_commits = sorted(commit_objects, key=lambda x: x[0].committed_date)
    
    # Creating a sorted dictionary of commits with their modified files
    sorted_commits_with_changes = {commit_hash: list(commits_with_changes[commit_hash]) for _, commit_hash in sorted_commits}

    return sorted_commits_with_changes

In [9]:
# Example usage
sorted_commit_hashes = get_all_relevant_commits(repo_path, file_paths)

# Display results
print(f"Total number of commits: {len(sorted_commit_hashes)}")
print("Commits (earliest to latest):")
for commit_hash, files in list(sorted_commit_hashes.items())[:10]:  # Displaying the first 10 for brevity
    print(f"{commit_hash}: {files}")

Total number of commits: 4
Commits (earliest to latest):
8cc30d8daec20df2f5dc35563d5886e32b651b1c: ['build/docker/requirements.txt']
efb404d38280dc9ecf8f88c9b0c658385861bdcf: ['docker/requirements.txt']
2b103d0fd7127d0e89126ca2d041a8336700331c: ['docker/requirements.txt']
d68ef4868d1b0a06a166d9856f5852b63c59473f: ['docker/requirements.txt']


## java pom.xml parsing 

In [10]:
# Initialize file cache
file_cache = {}
properties_cache = {}
NAMESPACE = {'mvn': 'http://maven.apache.org/POM/4.0.0'}
properties = {}

def parse_pom_xml(content: str, file_path: str, properties: Dict[str, str] = None) -> Dict[str, str]:
    if properties is None:
        properties = {}
    else:
        properties = properties.copy()  # Avoid side effects

    dependencies = {}

    try:
        root = ET.fromstring(content)

        # Update namespace if needed
        if 'xmlns' in root.attrib:
            NAMESPACE['mvn'] = root.attrib['xmlns']

        # Load project properties
        for prop in root.findall(".//mvn:properties/*", NAMESPACE):
            if prop.tag and prop.text:
                prop_name = prop.tag.split('}')[-1]
                properties[prop_name] = prop.text.strip()
                properties_cache[prop_name] = properties[prop_name]
        
        # Load project and parent versions as fallback properties
        project_version = root.find(".//mvn:version", NAMESPACE)
        if project_version is not None and project_version.text:
            properties["project.version"] = project_version.text.strip()

        parent_version = root.find(".//mvn:parent/mvn:version", NAMESPACE)
        if parent_version is not None and parent_version.text:
            properties["parent.version"] = parent_version.text.strip()

        parent_info = {}
        relative_path_elem = root.find(".//mvn:parent/mvn:relativePath", NAMESPACE)
        if relative_path_elem is not None and relative_path_elem.text:
            parent_info = {"parent_pom_path": relative_path_elem.text.strip()}

        # Read dependencies
        for dependency in root.findall(".//mvn:dependency", NAMESPACE):
            group_id = dependency.find("mvn:groupId", NAMESPACE)
            artifact_id = dependency.find("mvn:artifactId", NAMESPACE)
            version = dependency.find("mvn:version", NAMESPACE)

            if group_id is not None and artifact_id is not None:
                dep_key = f"{group_id.text.strip()}:{artifact_id.text.strip()}"
                if "${" in dep_key and "}" in dep_key:
                    dep_key = (resolve_cached(dep_key))

                if version is not None and version.text:
                    version_text = version.text.strip()
                    if version_text.startswith("${") and version_text.endswith("}"):
                        prop_name = version_text[2:-1]
                        resolved_version = properties.get(prop_name, "UNRESOLVED")
                        if resolved_version == "UNRESOLVED":
                            resolved_version = resolve_from_parent(prop_name, file_path, parent_info)
                            if (resolved_version =="UNRESOLVED"):
                                resolved_version = properties_cache.get(prop_name, "UNRESOLVED")
                        if resolved_version.startswith("${") and resolved_version.endswith("}"):
                            resolved_version = resolve_cached(resolved_version)
                        dependencies[dep_key] = resolved_version
                    else:
                        dependencies[dep_key] = version_text
                #else:
                    #dependencies[dep_key] = "UNSPECIFIED"

    except ET.ParseError as e:
        print(f"XML parsing error: {e}")

    return dependencies




def resolve_cached(prop_name, visited=None):
    matches = re.findall(r"\$\{([^}]+)\}", prop_name)
    if not matches:
        return properties_cache.get(prop_name)  # nothing to resolve, return as is

    resolved = prop_name
    for match in matches:
        inner_value = resolve_cached(match)
        if inner_value is None:
            return match
        resolved = resolved.replace(f"${{{match}}}", inner_value)

    return resolved



def resolve_from_parent(prop_name: str, file_path: str, properties: Dict[str, str]) -> str:
    parent_pom_path = properties.get("parent_pom_path")   
    if parent_pom_path is None:
        final_path = "pom.xml"
        parent_file = str(final_path).replace("\\", "/")
        if parent_file not in file_cache:
            return "UNRESOLVED"
        content = file_cache[parent_file]
        parent_prop_value = get_all_properties(content, prop_name, final_path, {})
        return parent_prop_value

    file_path = Path(file_path) if not isinstance(file_path, Path) else file_path
    parent_pom_path = Path(parent_pom_path) if not isinstance(parent_pom_path, Path) else parent_pom_path
    combined = file_path.parent / parent_pom_path

    stack = []
    for part in combined.parts:
        if part == "..":
            if stack and stack[-1] != "..":
                stack.pop()
            else:
                stack.append(part)
        elif part != ".":
            stack.append(part)
    final_path = Path(*stack)
    parent_file = str(final_path).replace("\\", "/")
    if parent_file not in file_cache:
        #print(f"[DEBUG] Skipping missing file: {parent_file}")
        return "UNRESOLVED"
    content = file_cache[parent_file]
    parent_prop_value = get_all_properties(content, prop_name, final_path, {})
    return parent_prop_value

def get_all_properties(content: str, target_prop: str, file_path: str, properties: Dict[str, str] = None):
    if properties is None:
        properties = {}
    else:
        properties = properties.copy()  # Avoid side effects

    try:
        root = ET.fromstring(content)

        if 'xmlns' in root.attrib:
            NAMESPACE['mvn'] = root.attrib['xmlns']

        # Properly assign keys without shadowing target_prop
        for prop in root.findall(".//mvn:properties/*", NAMESPACE):
            if prop.tag and prop.text:
                key = prop.tag.split('}')[-1]
                properties[key] = prop.text.strip()

        return properties.get(target_prop, "UNRESOLVED")
    except Exception as e:
        return "UNRESOLVED"    

def load_file_at_commit(repo, commit_hash, file_path):
    try:
        commit = repo.commit(commit_hash)
        blob = commit.tree / file_path
        content = blob.data_stream.read().decode('utf-8')
        file_cache[file_path] = content
        return content
    except Exception as e:
        print(f"[DEBUG]Error loading file '{file_path}' at commit '{commit_hash}': {e}")
        return None


def process_commit(repo, commit_hash, changed_files, dependencies_snapshot):
    for file_path in changed_files:
        if file_path.endswith("pom.xml"):
            content = load_file_at_commit(repo, commit_hash, file_path)
            if content:
                new_dependencies = parse_pom_xml(content, file_path, properties)
                dependencies_snapshot[file_path] = new_dependencies
    return dependencies_snapshot


def process_commits(repo, commits_with_changes):
    dependencies_over_time = {}
    dependencies_snapshot = {}

    for commit_hash, changed_files in tqdm(commits_with_changes.items(), desc="Processing commits"):
        dependencies_snapshot = process_commit(repo, commit_hash, changed_files, dependencies_snapshot)
        dependencies_over_time[commit_hash] = dependencies_snapshot.copy()

    return dependencies_over_time


dependencies_over_time = process_commits(repo, sorted_commit_hashes)

import json

with open("dependencies_over_time.json", "w") as f:
    json.dump(dependencies_over_time, f, indent=4)


Processing commits: 100%|████████████████████████████████████████████████████████████████████████| 4/4 [00:00<?, ?it/s]


## python and javascript parsing: 

In [11]:
# Initialize cache and data structures
file_cache = {}
unique_entries = set()
dependency_changes = defaultdict(list)

# Supported version specifiers in requirements.txt
VERSION_SPECIFIERS = [
    '==', '>=', '<=', '~=', '!=', '>', '<'
]


def load_file_at_commit(repo_path, commit_hash, file_path):
    try:
        result = subprocess.run(
            ["git", "-C", repo_path, "show", f"{commit_hash}:{file_path}"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            check=True
        )
        return result.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error loading file '{file_path}' at commit '{commit_hash}': {e.stderr}")
        return None


def process_commit_file_content(repo_path, sha, file_path, type):
    content = load_file_at_commit(repo_path, sha, file_path)
        
    if content:
        parse_file_content(content, sha, file_path, type)


def parse_file_content(content, sha, filename, type):
    dependencies = {}
    hash_pattern = re.compile(r"--hash=sha256:[a-fA-F0-9]{64}")

    if type == "py" and "requirements.txt" in filename:
        for line in content.splitlines():
            line = line.strip()

            if not line or line.startswith('#') or line.startswith("--"):  # Ignore comments and empty lines
                continue

            # Check if line is a hash
            if hash_pattern.match(line):
                continue

            # Remove inline comments
            if '#' in line:
                line = line.split('#', 1)[0].strip()

            # Find the version specifier used
            version = "UNSPECIFIED"
            for specifier in VERSION_SPECIFIERS:
                if specifier in line:
                    package, version = line.split(specifier, 1)
                    package = package.strip()
                    version = version.strip()
                    dependencies[package] = version
                    break
            else:  # No version specifier found
                package = line
                dependencies[package] = "latest-version-available"

    elif type == "js" and "package.json" in filename:
        import json
        try:
            parsed_json = json.loads(content)
            if "dependencies" in parsed_json:
                dependencies.update(parsed_json["dependencies"])
            if "devDependencies" in parsed_json:
                dependencies.update(parsed_json["devDependencies"])
        except json.JSONDecodeError as e:
            print(f"Error parsing JSON file '{filename}': {e}")

    # Store the parsed dependencies for this commit
    dependency_changes[sha] = {
        "filename": filename,
        "dependencies": dependencies,
        "ecosystem": type
    }


def process_commits(repo_path, commits_with_files):
    for sha, files in commits_with_files.items():
        for file_path in files:
            if file_path.endswith("requirements.txt"):
                process_commit_file_content(repo_path, sha, file_path, "py")
            elif file_path.endswith("package.json"):
                process_commit_file_content(repo_path, sha, file_path, "js")


def save_results(output_file="dependencies_over_time_py.json"):
    import json
    with open(output_file, "w") as f:
        json.dump(dependency_changes, f, indent=4)


process_commits(repo_path, sorted_commit_hashes)
save_results()

In [5]:
import subprocess
import re
from pathlib import Path
import os
import platform

def run_gradle_dependencies(project_dir: str, configuration: str = "runtimeClasspath"):
    """
    Runs 'gradlew dependencies' for the given configuration and parses resolved dependencies.
    Returns a list of (group, artifact, version).
    """
    project_path = Path(project_dir)

    gradlew = project_path / ("gradlew.bat" if platform.system() == "Windows" else "gradlew")
    if not gradlew.exists():
        raise FileNotFoundError(f"No gradle wrapper found in {project_dir} (looked for {gradlew.name})")

    try:
        result = subprocess.run(
            [str(gradlew), "dependencies", f"--configuration={configuration}"],
            cwd=project_dir,
            check=True,
            text=True,
            capture_output=True,
        )
    except subprocess.CalledProcessError as e:
        print("Gradle failed:", e.stderr)
        return []

    return parse_gradle_output(result.stdout)

def parse_gradle_output(output: str):
    dep_pattern = re.compile(r"(?P<prefix>[+\\]---|\|)?\s*(?P<ga>([\w\.-]+):([\w\.-]+)):(?P<version>[\w\.-]+)(\s->\s(?P<resolved>[\w\.-]+))?")

    seen = set()
    dependencies = []

    for line in output.splitlines():
        match = dep_pattern.search(line)
        if match:
            group, artifact = match.group(3), match.group(4)
            version = match.group("resolved") or match.group("version")

            key = (group, artifact, version)
            if key not in seen:
                seen.add(key)
                dependencies.append(key)

    return dependencies


In [6]:

deps = run_gradle_dependencies(repo_path)
for group, artifact, version in deps:
    print(f"{group}:{artifact}:{version}")


Gradle failed: The system cannot find the path specified.



In [15]:
def merge_dependencies(python_and_javascript: Dict, java: Dict) -> Dict:
    merged_data = {}

    # Combine all unique commit hashes from both datasets
    all_commits = set(python_and_javascript.keys()).union(java.keys())

    for commit in all_commits:
        merged_data[commit] = {
            "java": {},
            "python": {},
            "javascript": {}
        }

        # Add Java dependencies if present
        if commit in java:
            for file, content in java[commit].items():
                merged_data[commit]["java"][file] = content  # No renaming!

        # Add Python and JavaScript dependencies if present
        if commit in python_and_javascript:
            content = python_and_javascript[commit]

            # Check if it's a Python or JavaScript file based on 'ecosystem'
            if content.get("ecosystem") == "py":
                merged_data[commit]["python"][content["filename"]] = content["dependencies"]
            elif content.get("ecosystem") == "js":
                merged_data[commit]["javascript"][content["filename"]] = content["dependencies"]


    return merged_data

def remove_empty_objects(data):
    if isinstance(data, dict):
        return {
            k: remove_empty_objects(v)
            for k, v in data.items()
            if not (isinstance(v, dict) and not v)  # remove empty dicts
        }
    elif isinstance(data, list):
        return [remove_empty_objects(item) for item in data]
    else:
        return data

def save_merged_data(merged_data: Dict, output_file: str):
    with open(output_file, 'w') as f:
        json.dump(merged_data, f, indent=4)


with open('dependencies_over_time_py.json', 'r') as f:
    python_and_javascript = json.load(f)

with open('dependencies_over_time.json', 'r') as f:
    java = json.load(f)

merged_data = merge_dependencies(python_and_javascript, java)
new_merged_data = remove_empty_objects(merged_data)
save_merged_data(new_merged_data, 'merged_dependencies.json')
