In [None]:
import os
import re
import json
from os.path import exists
import pandas as pd
import subprocess
import shutil
import pydriller
from collections import defaultdict

from numpy.f2py.crackfortran import verbose
from tqdm.notebook import tqdm
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s')

#### Setup

In [None]:
react_repo = "https://github.com/facebook/react"
clone_dir = os.path.join(os.getcwd(), "react")

if not exists(clone_dir):
    with tqdm(total=100, desc="Cloning React repo", unit="chunk") as progress_bar:
        process = subprocess.Popen(
            ['git', 'clone', '--progress', react_repo, clone_dir],
            stderr=subprocess.PIPE,
            stdout=subprocess.DEVNULL,
            text=True
        )
        for line in process.stderr:
            if "Receiving objects" in line:
                percentage = int(line.split("%")[0].split()[-1])
                progress_bar.n = percentage
                progress_bar.refresh()
            elif "Resolving deltas" in line:
                progress_bar.set_description("Resolving deltas")
                progress_bar.refresh()
        process.wait()

    if process.returncode == 0:
        logging.info("cloning completed successfully")
    else:
        logging.error("error during cloning")
else:
    logging.warning("repo already cloned")

#### Task 1

In [None]:
# Patterns to identify component types
class_component_pattern = r"class\s+\w+\s+extends\s+(React\.Component|React\.PureComponent)"
functional_component_pattern = r"(function\s+\w+\s*\([^)]*\)\s*{[^}]*return\s*<[^>]+>)|(\w+\s*=\s*\([^)]*\)\s*=>\s*<[^>]+>)"

components = {
    "class_components": [],
    "functional_components": []
}


for root, _, files in os.walk(clone_dir):
    for file in files:
        if file.endswith(".js"):
            file_path = os.path.join(root, file)
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                
                # Class Components
                if re.search(class_component_pattern, content):
                    components["class_components"].append(file)
                
                # Functional Components
                elif re.search(functional_component_pattern, content):
                    components["functional_components"].append(file)

print("Class Components:")
print(json.dumps(components["class_components"], indent=4))

print("\nFunctional Components:")
print(json.dumps(components["functional_components"], indent=4))


In [None]:
components_output_path = os.path.join(os.getcwd(), "task1", "components.json")
os.makedirs(os.path.dirname(components_output_path), exist_ok=True)

with open(components_output_path, 'w', encoding='utf-8') as json_file:
    json.dump(components, json_file, indent=4)

In [None]:
#################################
# Detect Dependencies with Madge
#################################
# Executing the Madge command
logging.info("Running Madge to generate dependencies.json")
system("madge --json . > ./dependencies.json")

logging.info(f"Directory: {os.getcwd()}\\")

if exists("./dependencies.json"):
    logging.info("Dependencies data has been saved to 'dependencies.json'")
else:
    logging.error("Error generating the dependencies data")
    raise FileNotFoundError("dependencies.json file not found")

# Load dependencies JSON file 
with open("./dependencies.json", 'r', encoding='utf-8') as f:
    dependencies = json.load(f)

# Calculate the number of dependencies for each file
dependency_counts = {file: len(dependencies[file]) for file in dependencies}

# Sort files by the number of dependencies
top_3_files = sorted(dependency_counts.items(), key=lambda x: x[1], reverse=True)[:3]

# Prepare the top 3 files data
top_dependencies_data = {}
for file, _ in top_3_files:
    top_dependencies_data[file] = dependencies[file]

# Save only the top 3 files to a new JSON file
top_dependencies_file = "./top_dependencies.json"
with open(top_dependencies_file, 'w', encoding='utf-8') as f:
    json.dump(top_dependencies_data, f, indent=4)

# Check if the 'top_dependencies.json' file has been generated
if exists(top_dependencies_file):
    logging.info(f"Top 3 files with the highest number of dependencies have been saved to '{top_dependencies_file}'")
else:
    logging.error(f"Error generating the top dependencies data in '{top_dependencies_file}'")
    raise FileNotFoundError(f"{top_dependencies_file} file not found")

In [11]:
##########################
# Changes between Versions
##########################

logging.info("Getting the list of commits between v17.0.1 and v17.0.2")
commit_hashes = subprocess.check_output(['git', 'log', 'v17.0.1..v17.0.2', '--pretty=format:%H'], text=True).splitlines()

if not commit_hashes:
    logging.error("No commits found between v17.0.1 and v17.0.2")
    raise ValueError("No commits found between v17.0.1 and v17.0.2")

# Data structure to hold commit information
commit_info_list = []

# Regex patterns for changes, insertions, and deletions
files_changed_pattern = re.compile(r'(\d+) file[s]? changed')
insertions_pattern = re.compile(r'(\d+) insertion[s]?\(\+\)')
deletions_pattern = re.compile(r'(\d+) deletion[s]?\(\-\)')

# Iterate through each commit hash to extract inf
for commit_hash in commit_hashes:
    logging.info(f"Processing commit {commit_hash}")
    commit_details = subprocess.check_output(['git', 'show', '--stat', '--pretty=format:', commit_hash], text=True)

    # Extract information 
    files_changed_match = files_changed_pattern.search(commit_details)
    insertions_match = insertions_pattern.search(commit_details)
    deletions_match = deletions_pattern.search(commit_details)

    # Extract values or default to 0 
    files_changed = int(files_changed_match.group(1)) if files_changed_match else 0
    insertions = int(insertions_match.group(1)) if insertions_match else 0
    deletions = int(deletions_match.group(1)) if deletions_match else 0


    commit_info_list.append({
        "commit_hash": commit_hash,
        "files_changed": files_changed,
        "insertions": insertions,
        "deletions": deletions
    })

# Most substantial change 
max_commit = max(commit_info_list, key=lambda x: x['files_changed'])
    
commit_hash_task3 = max_commit['commit_hash'] 

# Documentation 
logging.info("Commit with the most substantial change:")
logging.info(f"Commit Hash: {max_commit['commit_hash']}")
logging.info(f"Files Changed: {max_commit['files_changed']}")
logging.info(f"Insertions: {max_commit['insertions']}")
logging.info(f"Deletions: {max_commit['deletions']}")

# Save the commit information 
commit_info_path = "./commit_info.json"
with open(commit_info_path, 'w', encoding='utf-8') as f:
    json.dump(max_commit, f, indent=4)

if exists(commit_info_path):
    logging.info(f"Commit information saved to '{commit_info_path}'")
else:
    logging.error(f"Failed to save commit information to '{commit_info_path}'")


2024-11-21 22:20:41,255 | INFO | Getting the list of commits between v17.0.1 and v17.0.2
2024-11-21 22:20:41,271 | INFO | Processing commit 12adaffef7105e2714f82651ea51936c563fe15c
2024-11-21 22:20:41,286 | INFO | Processing commit b2bbee7ba31bb7d212a9ff2e682a695a32f8a87f
2024-11-21 22:20:41,295 | INFO | Processing commit 8cc6ff24880ac00fdb9d11bce480a0433456e82d
2024-11-21 22:20:41,304 | INFO | Commit with the most substantial change:
2024-11-21 22:20:41,304 | INFO | Commit Hash: 12adaffef7105e2714f82651ea51936c563fe15c
2024-11-21 22:20:41,304 | INFO | Files Changed: 4
2024-11-21 22:20:41,305 | INFO | Insertions: 15
2024-11-21 22:20:41,305 | INFO | Deletions: 123
2024-11-21 22:20:41,305 | INFO | Commit information saved to './commit_info.json'


In [None]:
##########################
# Dependency Change
##########################

try:
    subprocess.run(["git", "checkout", commit_hash], check=True, text=True)
    print(f"Checked out to commit {commit_hash} successfully.")
except subprocess.CalledProcessError as e:
    print(f"Error checking out to commit {commit_hash}: {e}")

In [12]:
result_commit = subprocess.run(f"madge --json ./ > 'dependencies_commit.json'", shell=True)

# Step 3: Load dependencies from dependencies.json
with open("./task1/dependencies.json", 'r', encoding='utf-8') as f:
    try:
        dependencies = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        dependencies = {}

/bin/sh: madge: command not found


FileNotFoundError: [Errno 2] No such file or directory: './task1/dependencies.json'

In [None]:
# Execute Madge command for v17.0.1
subprocess.run(["git", "checkout", "v17.0.1"], shell=True)
result_v17_0_1 = subprocess.run(f"madge --json ./ > 'dependencies_v17_0_1.json'", shell=True)

with open("dependencies_v17_0_1.json", 'r', encoding='utf-8') as f:
    try:
        dependencies = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        dependencies = {}


# Execute Madge command for v17.0.2
subprocess.run(["git", "checkout", "v17.0.2"], shell=True)
result_v17_0_2 = subprocess.run(f"madge --json ./ > 'dependencies_v17_0_2.json'", shell=True)

with open("dependencies_v17_0_2.json", 'r', encoding='utf-8') as f:
    try:
        dependencies = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        dependencies = {}


In [None]:
# Compare dependency changes in v17_0_1 and v17_0_2

if os.path.exists("dependencies_v17_0_1.json") and os.path.exists("dependencies_v17_0_2.json"):
    with open("dependencies_v17_0_1.json", 'r', encoding='utf-8') as f:
        try:
            dependencies_v17_0_1 = json.load(f)
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON for v17.0.1: {e}")
            dependencies_v17_0_1 = {}

    with open("dependencies_v17_0_2.json", 'r', encoding='utf-8') as f:
        try:
            dependencies_v17_0_2 = json.load(f)
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON for v17.0.2: {e}")
            dependencies_v17_0_2 = {}
else:
    dependencies_v17_0_1 = {}
    dependencies_v17_0_2 = {}

changes = {
    "new_dependencies": {},
    "removed_dependencies": {}
}

# Find new dependencies introduced in v17.0.2
for file, deps in dependencies_v17_0_2.items():
    if file not in dependencies_v17_0_1:
        changes["new_dependencies"][file] = deps
    else:
        new_deps = set(deps) - set(dependencies_v17_0_1[file])
        if new_deps:
            changes["new_dependencies"][file] = list(new_deps)

# Find dependencies that were removed in v17.0.2
for file, deps in dependencies_v17_0_1.items():
    if file not in dependencies_v17_0_2:
        changes["removed_dependencies"][file] = deps
    else:
        removed_deps = set(deps) - set(dependencies_v17_0_2[file])
        if removed_deps:
            changes["removed_dependencies"][file] = list(removed_deps)

# Save dependency changes to a new JSON file
if changes["new_dependencies"] or changes["removed_dependencies"]:
    with open("dependency_changes", 'w', encoding='utf-8') as f:
        json.dump(changes, f, indent=4)
    print(f"Dependency changes between v17.0.1 and v17.0.2 have been documented in dependency_changes")
else:
    print("No changes in dependencies detected between v17.0.1 and v17.0.2.")

#### Task 2

In [None]:
# Mining Commit History
logging.getLogger().setLevel(logging.CRITICAL)
try:
    commits_data = []
    for commit in pydriller.Repository(clone_dir).traverse_commits():
        commit_time = commit.committer_date
        file_names = [mod.filename for mod in commit.modified_files]
        
        commits_data.append({
            "commit_time": commit_time,
            "files": file_names
        })

finally:
    logging.getLogger().setLevel(logging.INFO)

##### Temporal Coupling

In [None]:
# Analysis of Temporal Coupling
def divide_into_year_intervals(commits_data):
    """
    Helper function to dissect commits_data into yearly intervals
    """
    intervals = defaultdict(list)
    for commit in commits_data:
        year = commit["commit_time"].year
        intervals[year].append(commit)
    return intervals

# Yearly Temporal Couplings Analysis
def analyze_temporal_coupling(commits_data, time_windows):
    """
    Creates a dictionary for each year, containing another dictionary with the time_windows as keys and the corresponding coupling history.
    """
    intervals = divide_into_year_intervals(commits_data)
    yearly_temporal_couplings = defaultdict(list)

    for year, commits in intervals.items():
        temporal_couplings = {window: defaultdict(int) for window in time_windows}

        for i, commit_i in enumerate(commits):
            for j, commit_j in enumerate(commits):
                if i >= j:  # Avoid duplicated entries
                    continue

                time_diff = abs((commit_j["commit_time"] - commit_i["commit_time"]).total_seconds() / 3600)

                for window in time_windows:
                    if time_diff <= window:
                        for file1 in commit_i["files"]:
                            for file2 in commit_j["files"]:
                                if file1 != file2 and not (file1.endswith(".md") or file2.endswith(".md")): # Exclusion of .md files and self-pairs -> see pdf report for further details
                                    pair = tuple(sorted([file1, file2]))
                                    temporal_couplings[window][pair] += 1

        yearly_temporal_couplings[year] = temporal_couplings

    return yearly_temporal_couplings

# Extract top 3 temporal coupling entries to JSON
def save_results_and_collect_dataframe(yearly_temporal_couplings, time_windows, output_dir="./task2/tc"):
    """
    Saves top 3 temporal coupling entries for each year into a single JSON file per year,
    including all time windows. Also returns a dataframe for analysis and pdf reporting.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    all_results = []

    for year, temporal_couplings in yearly_temporal_couplings.items():
        year_results = []
        
        for window in time_windows:
            top_coupled = sorted(
                temporal_couplings[window].items(), key=lambda x: x[1], reverse=True
            )[:3]  # Top 3 file pairs for each time window

            for pair, count in top_coupled:
                entry = {
                    "file_pair": list(pair),
                    "coupled_commits": {
                        "time_window": window,
                        "commit_count": count
                    }
                }
                year_results.append(entry)

                all_results.append({
                    "Year": year,
                    "Time Window (h)": window,
                    "File Pair": f"{pair[0]} & {pair[1]}",
                    "Commit Count": count
                })

        year_file_path = os.path.join(output_dir, f"{year}.json")
        with open(year_file_path, "w") as f:
            json.dump(year_results, f, indent=4)

    df = pd.DataFrame(all_results)
    return df


In [None]:
######################
# TEMPORAL COUPLING
######################

time_windows = [24, 48, 72]
yearly_temporal_couplings = analyze_temporal_coupling(commits_data, time_windows)
df_temporal_coupling = save_results_and_collect_dataframe(yearly_temporal_couplings, time_windows)

In [None]:
# Yearly Logical Couplings Analysis
def analyze_logical_coupling(commits_data):
    """
    Analyzes logical coupling by finding files frequently committed together,
    excluding self-pairs.
    """
    intervals = divide_into_year_intervals(commits_data)
    yearly_logical_couplings = defaultdict(lambda: defaultdict(int))

    for year, commits in intervals.items():
        for commit in commits:
            files = [file for file in commit["files"] if not file.endswith(".md")]  # Exclude .md files -> see pdf report
            for i, file1 in enumerate(files):
                for j, file2 in enumerate(files):
                    if i < j and file1 != file2:  # Avoid duplicate pairs and self-pairs
                        pair = tuple(sorted([file1, file2]))
                        yearly_logical_couplings[year][pair] += 1

    return yearly_logical_couplings

# Extract top 3 logical coupling entries to JSON
def save_results_and_collect_dataframe(yearly_logical_couplings, output_dir="./task2/lc"):
    """
    Saves top 3 logical coupling entries for each year into a single JSON file per year.
    Also returns a dataframe for analysis and pdf reporting.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    all_results = []

    for year, logical_couplings in yearly_logical_couplings.items():
        year_results = []

        top_coupled = sorted(
            logical_couplings.items(), key=lambda x: x[1], reverse=True
        )[:3]

        for pair, count in top_coupled:
            entry = {
                "file_pair": list(pair),
                "commit_count": count
            }
            year_results.append(entry)

            all_results.append({
                "Year": year,
                "File Pair": f"{pair[0]} & {pair[1]}",
                "Commit Count": count
            })

        year_file_path = os.path.join(output_dir, f"{year}.json")
        with open(year_file_path, "w") as f:
            json.dump(year_results, f, indent=4)

    df = pd.DataFrame(all_results)
    return df

In [None]:
######################
# LOGICAL COUPLING
######################

yearly_logical_couplings = analyze_logical_coupling(commits_data)
df_logical_coupling = save_results_and_collect_dataframe(yearly_logical_couplings)

#### Task 3