# New Cleaned Code

In [None]:
import pandas as pd
import ast
import tokenize
from io import StringIO
import textwrap
import re
import autopep8

# --- Patch legacy Python 2 exceptions ---
def fix_legacy_exception_syntax(code: str) -> str:
    return re.sub(r'except\s+(\w+)\s*,\s*(\w+):', r'except \1 as \2:', code)

# --- Fallback: Regex-based docstring remover ---
def remove_docstrings_fallback(code: str) -> str:
    return re.sub(r'(""".*?"""|\'\'\'.*?\'\'\')', '', code, flags=re.DOTALL)

# --- Robust AST-based docstring remover with fallback ---
def remove_docstrings(source_code: str) -> str:
    source_code = fix_legacy_exception_syntax(source_code)
    lines = source_code.splitlines()
    try:
        tree = ast.parse(source_code)
    except SyntaxError:
        return remove_docstrings_fallback(source_code)

    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
            if not node.body:
                continue
            first_stmt = node.body[0]
            if (
                isinstance(first_stmt, ast.Expr)
                and isinstance(first_stmt.value, ast.Constant)
                and isinstance(first_stmt.value.value, str)
            ):
                start = first_stmt.lineno - 1
                end = getattr(first_stmt, 'end_lineno', start + len(first_stmt.value.value.splitlines()))
                for i in range(start, end):
                    if 0 <= i < len(lines):
                        lines[i] = ''
    return '\n'.join(lines)

# --- Remove all `#` comments with robust indentation fixing ---
def remove_all_comments(code: str) -> str:
    attempts = 0
    max_attempts = 1
    # Try tokenizing; if an IndentationError occurs, try to fix the code's indentation
    while attempts < max_attempts:
        try:
            io_obj = StringIO(code)
            tokens = list(tokenize.generate_tokens(io_obj.readline))
            break  # Successful tokenization, exit the loop.
        except IndentationError as e:
            attempts += 1
            code = autopep8.fix_code(code)
    else:
        # If still failing after max_attempts, return the code unchanged.
        return code

    output = ""
    last_lineno = -1
    last_col = 0
    for tok in tokens:
        token_type, token_string, start, end, _ = tok
        if token_type == tokenize.COMMENT:
            continue
        sline, scol = start
        eline, ecol = end
        if sline > last_lineno:
            output += "\n" * (sline - last_lineno - 1)
            last_col = 0
        if scol > last_col:
            output += " " * (scol - last_col)
        output += token_string
        last_lineno = eline
        last_col = ecol
    return output

# --- Remove extra blank lines ---
def remove_extra_blank_lines(code: str) -> str:
    cleaned_lines = []
    previous_blank = False
    for line in code.splitlines():
        if line.strip() == "":
            if not previous_blank:
                cleaned_lines.append("")
                previous_blank = True
        else:
            cleaned_lines.append(line.rstrip())
            previous_blank = False
    return "\n".join(cleaned_lines)

# --- Fix indentation using autopep8 ---
def fix_indentation(code: str) -> str:
    return autopep8.fix_code(code)

# --- Full pipeline ---
def split_code_and_comments(source_code: str) -> str:
    # First, fix indentation issues.
    source_code = fix_indentation(source_code)
    source_code = textwrap.dedent(source_code)
    no_docstrings = remove_docstrings(source_code)
    no_comments = remove_all_comments(no_docstrings)
    final_code = remove_extra_blank_lines(no_comments)
    return final_code

def clean_code_column(input_csv: str, code_column: str = "code"):
    df = pd.read_csv(input_csv)
    df["cleaned_code"] = df[code_column].apply(split_code_and_comments)
    return df

# --- Step 6: Run it ---
if __name__ == "__main__":
    input_path = "/work/pi_wenlongzhao_umass_edu/27/anamikaghosh/raw_data_CSN/CodeSearchNet_Python_train.csv"
    df = clean_code_column(input_path)
    output_path = "/work/pi_wenlongzhao_umass_edu/27/anamikaghosh/pre_processing_CSN/CodeSearchNet_Python_train_cleaned.csv"
    df.to_csv(output_path, index=False)
    print(f"Cleaned CSV written to: {output_path}")




IndentationError: unindent does not match any outer indentation level (<tokenize>, line 11)

# Flag for failed records

In [None]:
import pandas as pd
import ast
import tokenize
from io import StringIO
import textwrap
import re
import autopep8
import time
from tqdm import tqdm

# --- Patch legacy Python 2 exceptions ---
def fix_legacy_exception_syntax(code: str) -> str:
    return re.sub(r'except\s+(\w+)\s*,\s*(\w+):', r'except \1 as \2:', code)

# --- Fallback: Regex-based docstring remover ---
def remove_docstrings_fallback(code: str) -> str:
    return re.sub(r'(""".*?"""|\'\'\'.*?\'\'\')', '', code, flags=re.DOTALL)

# --- Robust AST-based docstring remover with fallback ---
def remove_docstrings(source_code: str) -> str:
    source_code = fix_legacy_exception_syntax(source_code)
    lines = source_code.splitlines()
    try:
        tree = ast.parse(source_code)
    except SyntaxError:
        return remove_docstrings_fallback(source_code)

    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
            if not node.body:
                continue
            first_stmt = node.body[0]
            if (
                isinstance(first_stmt, ast.Expr)
                and isinstance(first_stmt.value, ast.Constant)
                and isinstance(first_stmt.value.value, str)
            ):
                start = first_stmt.lineno - 1
                end = getattr(first_stmt, 'end_lineno', start + len(first_stmt.value.value.splitlines()))
                for i in range(start, end):
                    if 0 <= i < len(lines):
                        lines[i] = ''
    return '\n'.join(lines)

# --- Remove all `#` comments with robust indentation fixing ---
def remove_all_comments(code: str) -> (str, bool):

    attempts = 0
    max_attempts = 1
    failed = False
    while attempts < max_attempts:
        try:
            io_obj = StringIO(code)
            tokens = list(tokenize.generate_tokens(io_obj.readline))
            break  # Successful tokenization, exit loop.
        except:
            attempts += 1
            code = autopep8.fix_code(code)
    else:
        # If we exceed max_attempts, flag the issue and return code unchanged.
        failed = True
        return code, failed

    # Build code output from tokens, skipping those with type COMMENT.
    output = ""
    last_lineno = -1
    last_col = 0
    for tok in tokens:
        token_type, token_string, start, end, _ = tok
        if token_type == tokenize.COMMENT:
            continue
        sline, scol = start
        eline, ecol = end
        if sline > last_lineno:
            output += "\n" * (sline - last_lineno - 1)
            last_col = 0
        if scol > last_col:
            output += " " * (scol - last_col)
        output += token_string
        last_lineno = eline
        last_col = ecol
    return output, failed

# --- Remove extra blank lines ---
def remove_extra_blank_lines(code: str) -> str:
    cleaned_lines = []
    previous_blank = False
    for line in code.splitlines():
        if line.strip() == "":
            if not previous_blank:
                cleaned_lines.append("")
                previous_blank = True
        else:
            cleaned_lines.append(line.rstrip())
            previous_blank = False
    return "\n".join(cleaned_lines)

# --- Fix indentation using autopep8 ---
def fix_indentation(code: str) -> str:
    return autopep8.fix_code(code)

# --- Full pipeline ---
def split_code_and_comments(source_code: str) -> (str, bool):
    # First, fix indentation issues and dedent
    source_code = fix_indentation(source_code)
    source_code = textwrap.dedent(source_code)
    # Remove docstrings
    no_docstrings = remove_docstrings(source_code)
    # Remove inline comments and get flag
    no_comments, flag = remove_all_comments(no_docstrings)
    # Remove extra blank lines
    final_code = remove_extra_blank_lines(no_comments)
    return final_code, flag

def clean_code_column(input_csv: str, code_column: str = "code"):
    df = pd.read_csv(input_csv)
    cleaned_codes = []
    flags = []
    for idx, row in tqdm(df.iterrows(), total=df.shape[0]):
        final_code, flag = split_code_and_comments(str(row[code_column]))
        cleaned_codes.append(final_code)
        flags.append(flag)
    df["cleaned_code"] = cleaned_codes
    # Record indices where remove_all_comments returned code unchanged (flag True)
    df["remove_all_comments_issue"] = flags
    return df

# --- Step 6: Run it ---
if __name__ == "__main__":
    input_path = "/work/pi_wenlongzhao_umass_edu/27/anamikaghosh/raw_data_CSN/CodeSearchNet_Python_train.csv"
    start = time.time()
    df = clean_code_column(input_path)
    output_path = "/work/pi_wenlongzhao_umass_edu/27/anamikaghosh/pre_processing_CSN/CodeSearchNet_Python_train_cleaned.csv"
    df.to_csv(output_path, index=False)
    end = time.time()
    print(f"Cleaned CSV written to: {output_path} in {end-start} seconds")


  1%|          | 1923/251820 [03:37<7:50:06,  8.86it/s] 


KeyboardInterrupt: 

In [None]:
import pandas as pd
import ast
import tokenize
from io import StringIO

def validate_comments(code: str):
    
    # --- Check for inline comment tokens ---
    has_comment = False
    comment_tokens = []
    try:
        tokens = list(tokenize.generate_tokens(StringIO(code).readline))
    except Exception as e:
        comment_tokens.append(f"Tokenization error: {e}")
        has_comment = True
    else:
        for token in tokens:
            if token.type == tokenize.COMMENT:
                has_comment = True
                comment_tokens.append(token.string)
    
    # --- Check for docstrings via AST ---
    has_docstring = False
    docstrings = []
    try:
        tree = ast.parse(code)
        for node in ast.walk(tree):
            if isinstance(node, (ast.Module, ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
                ds = ast.get_docstring(node, clean=False)
                if ds is not None:
                    has_docstring = True
                    docstrings.append(ds)
    except Exception as e:
        docstrings.append(f"AST parse error: {e}")
        has_docstring = True

    return has_comment, comment_tokens, has_docstring, docstrings

def validate_cleaned_code(csv_file: str, code_column: str = "cleaned_code"):
    
    df = pd.read_csv(csv_file)
    results = []
    for i, row in df.iterrows():
        code = str(row[code_column])
        has_comment, comment_tokens, has_docstring, docstrings = validate_comments(code)
        if has_comment or has_docstring:
            results.append({
                "row": i,
                "has_comment": has_comment,
                "comment_tokens": comment_tokens,
                "has_docstring": has_docstring,
                "docstrings": docstrings
            })
    return results

if __name__ == "__main__":
    # Path to cleaned CSV file 
    csv_file = "/work/pi_wenlongzhao_umass_edu/27/anamikaghosh/pre_processing_CSN/CodeSearchNet_Python_train_cleaned.csv"

    
    # Validate the cleaned code column.
    problems = validate_cleaned_code(csv_file, code_column="cleaned_code")
    
    if not problems:
        print("All rows have been successfully cleaned: no inline comments or docstrings were found!")
    else:
        print("Some rows still contain comments or docstrings:")
        for p in problems:
            print(f"Row {p['row']}:")
            if p["has_comment"]:
                print(f"  Comments found: {p['comment_tokens']}")
            if p["has_docstring"]:
                print(f"  Docstrings found: {p['docstrings']}")


⚠️ Some rows still contain comments or docstrings:
Row 1318:
  Docstrings found: ['AST parse error: invalid syntax (<unknown>, line 14)']
Row 1322:
  Docstrings found: ['AST parse error: invalid syntax (<unknown>, line 37)']
Row 1507:
  Docstrings found: ['AST parse error: Missing parentheses in call to \'print\'. Did you mean print("Illegal URL resource address.\\n")? (<unknown>, line 11)']
Row 1519:
  Docstrings found: ['AST parse error: Missing parentheses in call to \'print\'. Did you mean print("Bad choice for interpolate:", interpolate)? (<unknown>, line 6)']
Row 1547:
Row 2011:
  Comments found: ['Tokenization error: unindent does not match any outer indentation level (<tokenize>, line 5)']
  Docstrings found: ['AST parse error: unindent does not match any outer indentation level (<unknown>, line 5)']
Row 2016:
  Comments found: ['Tokenization error: unindent does not match any outer indentation level (<tokenize>, line 5)']
  Docstrings found: ['AST parse error: unindent does no