In [1]:
from pathlib import Path 
import os, dotenv, yaml

with open("config.yaml", "r") as f:
    config = yaml.safe_load(f)

dotenv.load_dotenv()
os.chdir(Path(config["pythonpath"]).expanduser())

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json

In [3]:
from pathlib import PurePosixPath

def prepend_parent_dir_to_filename(p: str) -> str:
    pp = PurePosixPath(p)
    if pp.parent == pp:  # defensive
        return p

    parent_name = pp.parent.name          # e.g., "HKDF256"
    grandparent = pp.parent.parent        # e.g., ".../KDF"
    new_name = f"{parent_name}_{pp.name}" # e.g., "HKDF256_002_HMAC.cry"
    return str(grandparent / new_name)

VERSION = config["version"]
sliced_df = pd.read_json(f"data/some_slices_raw_{VERSION}.jsonl", lines=True)
#sliced_df = sliced_df.iloc[ : 250].reset_index(drop=True)
#sliced_df["filename"] = sliced_df["filename"].str.replace(r"^cryptol_slices/", "", regex=True)
#sliced_df["filename"] = sliced_df["filename"].apply(lambda p: str(REPO_ROOT / p))
#sliced_df["filename"] = sliced_df["filename"].apply(prepend_parent_dir_to_filename)

In [4]:
import re
from typing import List, Tuple

_HEX_TOKEN_RE = re.compile(r"0x[0-9a-fA-F]+")
_BRACKET_BLOCK_RE = re.compile(r"\[[\s\S]*?\]")  # non-greedy across newlines


def _is_hex_only_list(inner: str) -> bool:
    """
    True iff inner content is only comma-separated hex tokens with whitespace/newlines.
    """
    s = inner.strip()
    if not s:
        return False
    parts = [p.strip() for p in s.split(",")]
    if parts and parts[-1] == "":  # allow trailing comma
        parts.pop()
    return bool(parts) and all(_HEX_TOKEN_RE.fullmatch(p) is not None for p in parts)


def format_hex_sequences_preserve_indent(content: str, width: int = 80, tab_width: int = 8) -> str:
    """
    Reformat ONLY bracketed lists whose elements are ONLY hex literals (0x..),
    wrapping to `width` while PRESERVING the indentation that already exists.

    Policy:
      - Try to append next hex token to current line (", <tok>") if line <= width.
      - If it would exceed width, start a new line using the original prefix
        (the characters before the first hex token on the next original token line).
      - Do not create new indentation styles.
    """
    out_parts: List[str] = []
    last = 0

    for m in _BRACKET_BLOCK_RE.finditer(content):
        block = m.group(0)
        start, end = m.span()

        out_parts.append(content[last:start])

        inner = block[1:-1]
        if not _is_hex_only_list(inner):
            out_parts.append(block)
            last = end
            continue

        # Work line-by-line so we can reuse *existing* indentation prefixes.
        orig_lines = block.splitlines()
        token_lines: List[Tuple[int, str]] = []  # (line_index, prefix_before_first_hex)
        close_line_prefix = None

        for i, ln in enumerate(orig_lines):
            mt = _HEX_TOKEN_RE.search(ln)
            if mt:
                token_lines.append((i, ln[:mt.start()]))
            if close_line_prefix is None and "]" in ln:
                # best-effort: indentation before the first ']' on that line
                close_line_prefix = ln[:ln.index("]")]

        if not token_lines:
            out_parts.append(block)
            last = end
            continue

        # All hex tokens in order
        hex_tokens = _HEX_TOKEN_RE.findall(block)

        first_line_idx, first_prefix = token_lines[0]

        # Keep any lines that come BEFORE the first token line (verbatim)
        new_lines: List[str] = orig_lines[:first_line_idx]

        # Build token content lines with greedy packing
        token_prefixes = [p for (_, p) in token_lines]
        prefix_idx = 0

        cur = first_prefix
        at_line_start = True

        for j, tok in enumerate(hex_tokens):
            piece = tok + ("," if j < len(hex_tokens) - 1 else "")
            sep = "" if at_line_start else " "
            candidate = cur + sep + piece

            # measure visual width with tabs expanded
            if len(candidate.expandtabs(tab_width)) <= width:
                cur = candidate
                at_line_start = False
            else:
                # flush current line and start a new one using the *next existing* prefix
                new_lines.append(cur)
                prefix_idx = min(prefix_idx + 1, len(token_prefixes) - 1)
                cur = token_prefixes[prefix_idx] + piece
                at_line_start = False

        # Try to keep closing bracket on same line if it fits
        close_prefix = close_line_prefix if close_line_prefix is not None else first_prefix
        inline_close = cur + "]"
        if len(inline_close.expandtabs(tab_width)) <= width:
            new_lines.append(inline_close)
        else:
            new_lines.append(cur)
            new_lines.append(close_prefix + "]")

        out_parts.append("\n".join(new_lines))
        last = end

    out_parts.append(content[last:])
    return "".join(out_parts)



sliced_df["content"] = sliced_df["content"].apply(
    lambda c: format_hex_sequences_preserve_indent(c, width=80)
)


In [5]:
from src.preprocessing.interpreter_process import process_sliced_df_to_df 


MOUNT_DIR = Path(config["mount_dir"]).expanduser()
MOUNT_DIR.mkdir(parents=True, exist_ok=True)
SERVER_URL = config["cryptol_server_url"]

df = process_sliced_df_to_df(
        df=sliced_df,
        host_mount_dir=MOUNT_DIR,
        server_url=SERVER_URL,
    )

File failed to load: stored=cryptol_slices/cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/010_blockify.cry load_as=cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/010_blockify.cry
File failed to load: stored=cryptol_slices/cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/001_aes_gcm_siv.cry load_as=cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/001_aes_gcm_siv.cry
File failed to load: stored=cryptol_slices/cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/011_unblockify.cry load_as=cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/011_unblockify.cry
File failed to load: stored=cryptol_slices/cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/004_derive_key.cry load_as=cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/004_derive_key.cry
File failed to load: stored=cryptol_slices/cryptol-specs/Primitive/Symmetric/Cipher/Authenticated/AES_GCM_SIV/

In [6]:
filter = df[df["n_imports_original"] != df["n_imports_final"]]

filter.head()

Unnamed: 0,filename,filetype,content,n_imports_original,n_imports_final
35,cryptol_slices/cryptol-specs/Primitive/Symmetr...,cry,import Primitive::Symmetric::Cipher::Block::Si...,10,1
36,cryptol_slices/cryptol-specs/Primitive/Symmetr...,cry,import Primitive::Symmetric::Cipher::Block::Si...,10,1
37,cryptol_slices/cryptol-specs/Primitive/Symmetr...,cry,import Primitive::Symmetric::Cipher::Block::Si...,10,1
38,cryptol_slices/cryptol-specs/Primitive/Symmetr...,cry,import Primitive::Symmetric::Cipher::Block::Si...,10,1
39,cryptol_slices/cryptol-specs/Primitive/Symmetr...,cry,import Primitive::Symmetric::Cipher::Block::Si...,10,1


In [7]:

'''for idx, row in df.iterrows():
    
    print(f"--- ROW {idx} ---")
    print("Filename:", row['filename'])
    print(f"Number of Original Imports: {row['n_imports_original']}, Final Imports: {row['n_imports_final']}")
    print("Content:\n", row['content'], sep='')
    print("*" * 100)
'''


'for idx, row in df.iterrows():\n\n    print(f"--- ROW {idx} ---")\n    print("Filename:", row[\'filename\'])\n    print(f"Number of Original Imports: {row[\'n_imports_original\']}, Final Imports: {row[\'n_imports_final\']}")\n    print("Content:\n", row[\'content\'], sep=\'\')\n    print("*" * 100)\n'

In [8]:
df.to_json(f"data/some_slices_verified_{VERSION}.jsonl", lines=True, orient="records")