In [85]:
import os
import re
# Read dataset_summary.csv into a pandas DataFrame
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns

import sys
import difflib
from typing import List, Tuple

In [86]:
def read_csv_file(file_name: str) -> pd.DataFrame:
    csv_path = Path(__file__).with_name(file_name) if '__file__' in globals() else Path(file_name)
    print(f"Trying to read: {csv_path.resolve()}")
    df = pd.read_csv(csv_path)
    print('DataFrame shape:', df.shape)
    return df

In [87]:
df_all_data_successful_patches = read_csv_file('../successfull_user_patches_mapping.csv')

Trying to read: /home/diogenes/pylingual_colaboration/pylingual_download/code/successfull_user_patches_mapping.csv
DataFrame shape: (2065, 2)


In [88]:
ANN_LINE_RE = re.compile(r'^\s*#\s*Line\s+\d+\s*$')  # e.g., "# Line 123"

In [89]:
def code_portion(s: str) -> str:
    """
    Return the code portion of a line by stripping inline Python comments.
    Examples:
      "  x = 1  # note" -> "x = 1"
      "  # only comment" -> "" (empty)
    """
    # Remove everything after first '#'
    before_hash = s.split('#', 1)[0]
    return before_hash.rstrip().lstrip()


In [90]:
def is_comment_only(s: str) -> bool:
    """
    True if the line is a pure comment (after stripping whitespace) or
    matches an annotation like '# Line 123'.
    """
    stripped = s.strip()
    if not stripped:
        return False  # empty isn't "comment", but will be handled via code_portion
    if stripped.startswith('#'):
        return True
    if ANN_LINE_RE.match(stripped):
        return True
    return False

In [91]:
def parse_unified_hunks(lines: List[str]) -> List[List[str]]:
    """
    Split a unified diff (as produced by difflib.unified_diff) into hunks,
    keeping file headers outside. Returns a list of hunk lists (including the
    '@@ ... @@' header and subsequent lines up to next hunk/file header).
    """
    hunks = []
    cur = []
    for line in lines:
        if line.startswith('@@ '):
            if cur:
                hunks.append(cur)
            cur = [line]
        elif line.startswith('--- ') or line.startswith('+++ '):
            # file headers handled by caller; end current hunk if any
            if cur:
                hunks.append(cur)
                cur = []
        else:
            if cur:
                cur.append(line)
    if cur:
        hunks.append(cur)
    return hunks


In [92]:
def filter_hunk_only_code_changes(hunk: List[str]) -> List[str]:
    """
    Given a single hunk (starting with '@@ ... @@'), return a filtered hunk where
    only code changes remain:
      - Keep the hunk header.
      - Within the hunk, pair '-' and '+' runs and drop pairs that are identical
        after stripping comments; also drop single '-' or '+' lines that are
        comment-only (code portion empty).
    """
    if not hunk or not hunk[0].startswith('@@ '):
        return hunk[:]  # safety: not a hunk, return unchanged

    header = hunk[0]
    body = hunk[1:]

    # Collect contiguous change lines (n=0 so we shouldn't see ' ' context lines,
    # but we guard anyway).
    dels: List[str] = []
    adds: List[str] = []
    filtered_body: List[str] = []

    def flush_pairs():
        nonlocal dels, adds, filtered_body
        i = j = 0
        while i < len(dels) or j < len(adds):
            dline = dels[i] if i < len(dels) else None
            aline = adds[j] if j < len(adds) else None

            # Helpers to extract raw content (without diff prefix)
            def raw(x: str) -> str:
                return x[1:] if x and (x.startswith('-') or x.startswith('+') or x.startswith(' ')) else (x or "")

            # Decide if lines are comment-only or code-different
            if dline is not None and aline is not None:
                dcode = code_portion(raw(dline))
                acode = code_portion(raw(aline))
                d_comment_only = (not dcode) and is_comment_only(raw(dline))
                a_comment_only = (not acode) and is_comment_only(raw(aline))

                # If code portions are exactly equal, treat as comment-only change → drop
                if dcode == acode:
                    i += 1
                    j += 1
                    continue

                # If one side is pure comment and other has no code change, drop it
                if d_comment_only and not acode:
                    i += 1
                    j += 1
                    continue
                if a_comment_only and not dcode:
                    i += 1
                    j += 1
                    continue

                # Otherwise, it's a real code change → keep both lines
                filtered_body.append(dline)
                filtered_body.append(aline)
                i += 1
                j += 1

            elif dline is not None:
                # Pure deletion with no paired addition
                dcode = code_portion(raw(dline))
                if dcode:  # has code → keep
                    filtered_body.append(dline)
                else:
                    # comment-only deletion → drop
                    pass
                i += 1

            else:  # aline is not None
                acode = code_portion(raw(aline))
                if acode:  # has code → keep
                    filtered_body.append(aline)
                else:
                    # comment-only insertion → drop
                    pass
                j += 1

        dels = []
        adds = []

    # Walk through hunk body, grouping '-' and '+' runs, flushing when we hit anything else
    for ln in body:
        if ln.startswith('-'):
            dels.append(ln)
        elif ln.startswith('+'):
            adds.append(ln)
        else:
            # context or meta (rare with n=0). Flush current run, then keep line as-is.
            flush_pairs()
            # Optional: if you want to *never* show context, skip adding ln.
            # Here we skip context to satisfy "only display differing lines".
            # filtered_body.append(ln)  # <-- keep if you want context
    flush_pairs()

    if not filtered_body:
        return []  # drop entire hunk if nothing but comments changed

    return [header] + filtered_body

In [93]:
# Base directory where all these hash-named folders live
BASE_DIR = Path("../../decompiler_workspace")   # change to your root folder

# OUT_DIR = Path("../diffs")        # where to store diffs
# OUT_DIR.mkdir(parents=True, exist_ok=True)

In [94]:
def read_file(path: Path):
    try:
        return path.read_text(encoding="utf-8").splitlines(keepends=True)
    except UnicodeDecodeError:
        return path.read_text(encoding="latin-1", errors="replace").splitlines(keepends=True)


In [95]:
def s(p): return os.fspath(p)

In [96]:
from typing import Optional, Tuple
INDENTED_RE = re.compile(r"^indented_(\d+)(?:\.[^.]+)?$")  # matches indented_12 or indented_12.py

def highest_indented_file(
    directory: Path | str,
    pattern: str = "indented_*"  # change to "indented_*.py" to require .py
) -> Optional[Tuple[Path, int]]:
    """
    Return (path, idx) for the highest-numbered file named like 'indented_{idx}[.ext]'
    in the given directory. Returns None if none found.
    """
    directory = Path(directory)
    best: Optional[Tuple[Path, int]] = None

    for p in directory.glob(pattern):
        if not p.is_file():
            continue
        m = INDENTED_RE.match(p.name)
        if not m:
            continue
        n = int(m.group(1))
        if best is None or n > best[1]:
            best = (p, n)
    return best

In [97]:
def generate_diff(old_file: str, new_file: str) -> Tuple[List[str], str]:
    out_file =  BASE_DIR / file_hash / "user_patches" / "source_diffs.txt"

    with open(old_file, 'r', encoding='utf-8') as f:
        old_lines = f.readlines()
    with open(new_file, 'r', encoding='utf-8') as f:
        new_lines = f.readlines()

    # Make a unified diff over originals (keeps true line numbers in hunk headers)
    raw_diff = list(difflib.unified_diff(
        old_lines, new_lines,
        fromfile=old_file, tofile=new_file,
        lineterm="",n=5  # n=0 to minimize context 
    ))

    # Separate file headers and hunks
    out_lines: List[str] = []
    # Always keep file headers if present
    for ln in raw_diff:
        if ln.startswith('--- ') or ln.startswith('+++ '):
            out_lines.append(ln)

    # Parse hunks and filter
    hunks = parse_unified_hunks(raw_diff)
    for h in hunks:
        filtered = filter_hunk_only_code_changes(h)
        if filtered:
            out_lines.extend(filtered)

    text = "\n".join(out_lines)
    if out_file:
        with open(out_file, "w", encoding="utf-8") as f:
            f.write(text + ("\n" if text and not text.endswith("\n") else ""))
        print(f"Wrote filtered diff to {out_file}")
    else:
        print(text)

In [98]:
from pathlib import Path

def norm_str(x: str | None) -> str | None:
    if x is None:
        return None
    s = str(x).strip()
    return None if s == "" or s.lower() == "nan" else s

for idx, row in df_all_data_successful_patches.iterrows():
    file_hash = norm_str(row.get("file_hash"))
    patch_hash = norm_str(row.get("successful_patches"))

    # Skip rows with missing values
    if not file_hash or not patch_hash:
        print(f"⚠️  Skip row {idx}: missing file_hash or successful_patches")
        continue

    old_dir = BASE_DIR / file_hash / "decompiler_output"
    # highest_indented_file returns Optional[Tuple[Path, int]]
    res = highest_indented_file(old_dir, pattern="indented_*.py")
    if res is None:
        print(f"⚠️  No indented_*.py in {old_dir}")
        continue
    old_file_path, max_idx = res  # unpack

    new_file_path = BASE_DIR / file_hash / "user_patches" / patch_hash / "source.py"

    # if not old_file_path.exists():
    #     print(f"❌  Missing old file: {old_file_path}")
    #     continue
    # if not new_file_path.exists():
    #     print(f"❌  Missing new file: {new_file_path}")
    #     continue

    if old_file_path.exists() and new_file_path.exists():
        print(f"Processing file_hash={file_hash}, patch_hash={patch_hash}")
        generate_diff(str(old_file_path), str(new_file_path))
    # # If generate_diff accepts Path-like arguments (recommended):
    # diff_lines, out_path_str = generate_diff(str(old_file_path, new_file_path)
    # print(f"✅  Generated diff for file_hash={file_hash}, patch_hash={patch_hash} -> {out_path_str}")

    # # If your generate_diff currently requires str, convert:
    # # diff_lines, out_path_str = generate_diff(str(old_file_path), str(new_file_path))


Processing file_hash=1b071182ed192b46c3663a602507bca58c08890a1a6a0fffd52f3787123d49f8, patch_hash=090083fc164802921b7a3f2f831630a7688419d21500e1a05283b1861178c1ee
Wrote filtered diff to ../../decompiler_workspace/1b071182ed192b46c3663a602507bca58c08890a1a6a0fffd52f3787123d49f8/user_patches/source_diffs.txt
Processing file_hash=11b1becb1c340a39803513371a438fe2568976dbaa1364f52d20225b987ef5d8, patch_hash=0c6c22354361d136b981a91f3b4fead472dd05781e5857d14ce5324e390b96d3
Wrote filtered diff to ../../decompiler_workspace/11b1becb1c340a39803513371a438fe2568976dbaa1364f52d20225b987ef5d8/user_patches/source_diffs.txt
Processing file_hash=f6a25baadda3fa3fdaae3edc080251abeb540e10b9752483617a1e28bd81d8c0, patch_hash=9f5ef2e81fbb379fea38d9caea6dc197a249449a5ffb75853c6caff41c561b99
Wrote filtered diff to ../../decompiler_workspace/f6a25baadda3fa3fdaae3edc080251abeb540e10b9752483617a1e28bd81d8c0/user_patches/source_diffs.txt
Processing file_hash=e25c38320dba6e5c7979d9818b3d82ccc7f0c0938f5b1b8cae4576c