In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

##### Note: Code split between two parts for me. First part is gathering data to put in a way to use, second part is transforming it. Also, headers are below code. ##### 

## Methods ##


(Garble Methods)



add — returns a stable token for an original and increments its count.

original_from_token — reverse-maps a token back to the original string.

record_from_token — fetches the UserRecord associated with a token.

export_to_json — saves the mapper’s current state to disk.

load_from_json — restores the mapper’s state from a saved JSON file.




(Helper Methods)
is_valid_user — checks that a user value is non-null and non-empty.

is_valid_ipv4 — validates IPv4 dotted-quad format and range.

to_jagged_array — builds [[original, token, count, valid], ...] for non-anonymous use.

dump_json — writes a Python object to a pretty-printed JSON file.




(Data Methods)
load_dataframe — selects required columns from all Parquet files in DATA_DIR.

build_obfuscations — iterates rows to create user/IP token maps with counts/validity.

make_summary_payload — returns anonymized users/IPs jagged arrays plus meta.

failed_users_payload — returns anonymized records for users with failed jobs plus meta.

In [2]:
import os
import re
import json
import string
import secrets
from dataclasses import dataclass, asdict #asdict for json
from typing import Dict, Optional, List, Tuple #tuple for serializing
#For transforming data into what we want.
import duckdb
import pandas as pd
#For reading data and putting it into something usable.

In [3]:
DATA_DIR = "../data"
OUTPUT_DIR = "./Output"

### Import stuff ###

In [4]:
USER_COL   = "User"
IP_COL     = "JobsubClientIpAddress"
FAILED_COL = "DAG_NodesFailed"  # “boolean-ish”
NUM_STARTS_COL     = "NumJobStarts"
NUM_COMPLETIONS_COL= "NumJobCompletions"

### Config / Column Names ###

In [5]:
DIGITS = string.digits
LOWER = string.ascii_lowercase
UPPER = string.ascii_uppercase
DEFAULT_PUNCT = "!#$%&()*+,-.:;<=>?@[]^_{|}~"
CHAR_TYPE_CHOICES = ["digit", "lower", "upper", "punct"]
#defines UserRecord and GarbleTokenMapper for obfuscation.

@dataclass #shortcut class go brrr
class UserRecord:
    token: str
    count: int
    valid: bool

In [6]:
class GarbleTokenMapper:
    """
    Sequential (non-random) token mapper.

    - Users:  tokens like "UR1", "UR2", ...
    - IPs:    tokens like "IP1", "IP2", ...

    Keeps the same public API as the previous GarbleTokenMapper:
    """

    def __init__(
        self,
        prefix: str = "",
        start: int = 1,
        # legacy args kept for drop-in compatibility; ignored
        token_len: int = 8,
        allow_punctuation: bool = False,
        punct_chars: Optional[str] = None,
    ):
        self.prefix = str(prefix or "")
        self.start = int(start)
        # original -> UserRecord(token, count, valid)
        self._by_orig: Dict[str, UserRecord] = {}
        # token   -> original
        self._token_to_orig: Dict[str, str] = {}
        # issued tokens (not strictly needed for sequential, kept for parity)
        self._seen_tokens = set()
        # counter points to the LAST issued number (so next is _counter + 1)
        self._counter = self.start - 1

    @staticmethod
    def _extract_trailing_int(s: str) -> Optional[int]:
        m = re.search(r"(\d+)$", str(s))
        return int(m.group(1)) if m else None

    def _next_token(self) -> str:
        self._counter += 1
        return f"{self.prefix}{self._counter}"

    def add(self, original: str, valid: bool = True) -> str:
        key = str(original)
        if key in self._by_orig:
            rec = self._by_orig[key]
            rec.count += 1
            return rec.token

        token = self._next_token()
        self._seen_tokens.add(token)
        rec = UserRecord(token=token, count=1, valid=bool(valid))
        self._by_orig[key] = rec
        self._token_to_orig[token] = key
        return token

    def original_from_token(self, token: str) -> Optional[str]:
        return self._token_to_orig.get(str(token))

    def record_from_token(self, token: str) -> Optional[UserRecord]:
        orig = self._token_to_orig.get(str(token))
        return self._by_orig.get(orig) if orig is not None else None

    def export_to_json(self, filepath: str) -> None:
        entries = []
        for orig, rec in self._by_orig.items():
            e = asdict(rec)
            e["original"] = orig
            entries.append(e)
        state = {
            "entries": entries,
            "config": {"prefix": self.prefix, "start": self.start, "counter": self._counter},
        }
        os.makedirs(os.path.dirname(filepath) or ".", exist_ok=True)
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(state, f, indent=2)

    def load_from_json(self, filepath: str) -> None:
        with open(filepath, "r", encoding="utf-8") as f:
            data = json.load(f)

        entries = data.get("entries", [])
        cfg = data.get("config", {})

        # reset
        self._by_orig.clear()
        self._token_to_orig.clear()
        self._seen_tokens.clear()

        # keep existing prefix/start unless provided in file
        self.prefix = str(cfg.get("prefix", self.prefix))
        self.start = int(cfg.get("start", self.start))

        max_num = self.start - 1
        for e in entries:
            orig = str(e["original"])
            token = str(e["token"])
            count = int(e.get("count", 0))
            valid = bool(e.get("valid", True))
            rec = UserRecord(token=token, count=count, valid=valid)
            self._by_orig[orig] = rec
            self._token_to_orig[token] = orig
            self._seen_tokens.add(token)
            n = self._extract_trailing_int(token)
            if n is not None:
                max_num = max(max_num, n)

        # resume counting AFTER the largest seen number
        self._counter = int(cfg.get("counter", max_num))


### Token Mapper (Part 2) ###

In [7]:
def export_to_json(self, filepath: str):
        entries = []
        for orig, rec in self._by_orig.items():
            e = asdict(rec)
            e["original"] = orig
            entries.append(e)
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump({"entries": entries}, f, indent=2)

In [8]:
def load_from_json(self, filepath: str):
        with open(filepath, "r", encoding="utf-8") as f:
            data = json.load(f)
        entries = data.get("entries", [])

        # clear current
        self._by_orig.clear()
        self._token_to_orig.clear()
        self._seen_tokens.clear()

        for e in entries:
            orig = e["original"]
            token = e["token"]
            count = int(e.get("count", 0))
            valid = bool(e.get("valid", True))
            rec = UserRecord(token=token, count=count, valid=valid)
            self._by_orig[orig] = rec
            self._token_to_orig[token] = orig
            self._seen_tokens.add(token)

#### Json "export" ####

In [9]:
_ipv4_re = re.compile(r"^\d{1,3}(\.\d{1,3}){3}$")

def is_valid_user(u) -> bool:
    if pd.isna(u):
        return False
    s = str(u).strip()
    return len(s) > 0

In [10]:
def is_valid_ipv4(ip) -> bool:
    if pd.isna(ip):
        return False
    s = str(ip).strip()
    if not _ipv4_re.match(s):
        return False
    try:
        parts = [int(p) for p in s.split(".")]
    except ValueError:
        return False
    return all(0 <= p <= 255 for p in parts)

In [11]:
def to_jagged_array(ob_dict: Dict[str, Dict[str, object]]) -> List[List[object]]:
    """
    Anonymized jagged array builder:
      input: { original: {"id": token, "count": int, "valid": bool}, ... }
      output: [[token, count, valid], ...]   # NO original included
    """
    return [[data["id"], data["count"], data["valid"]]
            for _, data in ob_dict.items()] #throwaway with keys to get values in tuple.

In [12]:
def dump_json(obj, path: str):
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2)

#### Helpers/Secondary ####

In [13]:
def load_dataframe(data_dir: str) -> pd.DataFrame:
    """
    Uses DuckDB to select required columns from all parquet files in data_dir.
    """
    if not os.path.isdir(data_dir):
        raise FileNotFoundError(f"DATA_DIR does not exist or is not a directory: {data_dir}")

    pattern = f"{data_dir}/*.parquet"
    query = (
        "SELECT User, RequestMemory, CumulativeSlotTime, JobsubClientIpAddress, "
        "MATCH_EXP_JOB_Site, DAG_NodesFailed, NumJobCompletions, NumJobStarts "
        f"FROM '{pattern}'"
    )
    rel_obj = duckdb.sql(query)
    return rel_obj.df()

#### Loading Data ####

In [14]:
def build_obfuscations(
    df: pd.DataFrame,
    user_col: str = USER_COL,
    ip_col: str = IP_COL,
) -> Tuple[Dict[str, Dict[str, object]], Dict[str, Dict[str, object]], GarbleTokenMapper, GarbleTokenMapper]:
    """
    Build two dicts:
      users_dict = { original_user: {"id": token, "count": n, "valid": bool}, ... }
      ips_dict   = { original_ip:   {"id": token, "count": n, "valid": bool}, ... }
    Returns also the underlying mappers (useful if you want to export the maps).

    Tokens are sequential:
      - Users: UR1, UR2, ...
      - IPs:   IP1, IP2, ...
    """
    user_mapper = GarbleTokenMapper(prefix="UR", start=1)
    ip_mapper   = GarbleTokenMapper(prefix="IP", start=1)

    # iterate rows to add and count
    for _, row in df.iterrows():
        u = row.get(user_col)
        ip = row.get(ip_col)

        user_mapper.add(str(u), valid=is_valid_user(u))
        ip_mapper.add(str(ip), valid=is_valid_ipv4(ip))

    users_dict = {
        orig: {"id": rec.token, "count": rec.count, "valid": rec.valid}
        for orig, rec in user_mapper._by_orig.items()
    }
    ips_dict = {
        orig: {"id": rec.token, "count": rec.count, "valid": rec.valid}
        for orig, rec in ip_mapper._by_orig.items()
    }
    return users_dict, ips_dict, user_mapper, ip_mapper


#### Transform ####

In [15]:
def make_output_json(
    df: pd.DataFrame,
    users_dict: Dict[str, Dict[str, object]],
    ips_dict: Dict[str, Dict[str, object]],
) -> str:
    """
    Build the final JSON payload that includes jagged arrays and minimal metadata.
    """
    users_jagged = to_jagged_array(users_dict)
    ips_jagged   = to_jagged_array(ips_dict)

    payload = {
        "users": users_jagged,   # [[original, token, count, valid], ...]
        "ips":   ips_jagged,     # [[original, token, count, valid], ...]
        "meta": {
            "total_rows": int(len(df)),
            "distinct_users": int(len(users_dict)),
            "distinct_ips": int(len(ips_dict)),
        },
    }
    return json.dumps(payload, indent=2)


#### Generic User json (Below) ####

In [16]:
def make_summary_payload(
    df: pd.DataFrame,
    users_dict: Dict[str, Dict[str, object]],
    ips_dict: Dict[str, Dict[str, object]],
    user_col: str = USER_COL,
    ip_col: str = IP_COL,
) -> Dict[str, object]:
    """
    Build the summary payload and include per-user/IP correlation records:
      - users: [[token, count, valid], ...]               (no originals)
      - ips:   [[token, count, valid], ...]               (no originals)
      - user_ip_correlations: [
            [original_user, user_token, original_ip, ip_token, frequency, user_valid],
            ...
        ]
    """

    # Anonymized jagged arrays (no originals)
    users_jagged_anon = [[d["id"], d["count"], d["valid"]] for d in users_dict.values()]
    ips_jagged_anon   = [[d["id"], d["count"], d["valid"]] for d in ips_dict.values()]

    # Compute, for each user, the most frequent (mode) IP they used
    def _pick_mode_ip(series: pd.Series) -> Optional[str]:
        ser = series.dropna().astype(str)
        if ser.empty:
            return None
        return ser.value_counts().idxmax()

    tmp = df[[user_col, ip_col]].copy()
    tmp[user_col] = tmp[user_col].astype(str)
    top_ip_for_user = tmp.groupby(user_col)[ip_col].apply(_pick_mode_ip)

    # Build correlated records
    user_ip_correlations = []
    for orig_user, udata in users_dict.items():
        key_user = str(orig_user)
        ip_orig = top_ip_for_user.get(key_user, None)
        ip_token = ips_dict.get(ip_orig, {}).get("id") if ip_orig is not None else None
        user_ip_correlations.append([
            key_user,                 # original user
            udata["id"],              # garbled user
            ip_orig,                  # user's (mode) IP original
            ip_token,                 # garbled IP
            int(udata["count"]),      # frequency (user count)
            bool(udata["valid"]),     # user validity
        ])

    return {
        "users": users_jagged_anon,
        "ips": ips_jagged_anon,
        "user_ip_correlations": user_ip_correlations,
        "meta": {
            "total_rows": int(len(df)),
            "distinct_users": int(len(users_dict)),
            "distinct_ips": int(len(ips_dict)),
        },
    }


#### Failed User json (Below) ####

In [17]:
def failed_users_payload(
    df: pd.DataFrame,
    user_mapper: GarbleTokenMapper,
    user_col: str = USER_COL,
    starts_col: str = NUM_STARTS_COL,
    completions_col: str = NUM_COMPLETIONS_COL,
) -> Dict[str, object]:
    """
    Create a payload listing JUST users with failed jobs, WITHOUT originals.
    Each record: { "token": <str>, "failure_count": <int>, "valid": <bool> }
    """
    if not {user_col, starts_col, completions_col} <= set(df.columns):
        return {
            "failed_users": [],
            "meta": {
                "distinct_failed_users": 0,
                "total_failure_rows": 0,
                "note": "Required columns missing; cannot compute failed users.",
            },
        }

    mask_fail = (df[completions_col].astype("int") == 0) & (df[starts_col] > 0)
    failed_df = df.loc[mask_fail, [user_col]]

    # count failure rows per user
    fail_counts = failed_df.groupby(user_col)[user_col].count().rename("failure_count")

    records = []
    total_failure_rows = int(fail_counts.sum()) if not fail_counts.empty else 0

    for orig_user, fcount in fail_counts.items():
        token = user_mapper.add(str(orig_user), valid=is_valid_user(orig_user))
        records.append({
            "token": token,
            "failure_count": int(fcount),
            "valid": is_valid_user(orig_user),
        })

    payload = {
        "failed_users": records,
        "meta": {
            "distinct_failed_users": int(len(records)),
            "total_failure_rows": total_failure_rows,
        },
    }
    return payload

#### Full output ####

In [18]:
if __name__ == "__main__":
    # Load data
    df = load_dataframe(DATA_DIR)

    # Build obfuscations
    users_dict, ips_dict, user_mapper, ip_mapper = build_obfuscations(
        df, user_col=USER_COL, ip_col=IP_COL
    )

    # --- Create payloads / JSON strings ---
    summary_obj = make_summary_payload(df, users_dict, ips_dict)
    summary_json = json.dumps(summary_obj, indent=2)

    # Explicit JSON for the jagged arrays (as requested)
    users_jagged = to_jagged_array(users_dict)
    ips_jagged   = to_jagged_array(ips_dict)
    users_jagged_obj = {
        "users": users_jagged,
        "meta": {
            "distinct_users": int(len(users_dict)),
            "total_rows": int(len(df)),
        },
    }
    ips_jagged_obj = {
        "ips": ips_jagged,
        "meta": {
            "distinct_ips": int(len(ips_dict)),
            "total_rows": int(len(df)),
        },
    }

    # Failed users payload (JUST the users with failed jobs)
    failed_obj = failed_users_payload(
        df,
        user_mapper=user_mapper,
        user_col=USER_COL,
        starts_col=NUM_STARTS_COL,
        completions_col=NUM_COMPLETIONS_COL,
    )

    # --- Write files ---
    summary_path = os.path.join(OUTPUT_DIR, "summary.json")
    users_jagged_path = os.path.join(OUTPUT_DIR, "users_jagged.json")
    ips_jagged_path = os.path.join(OUTPUT_DIR, "ips_jagged.json")
    failed_users_path = os.path.join(OUTPUT_DIR, "failed_users.json")

    
    dump_json(summary_obj, summary_path)
    dump_json(users_jagged_obj, users_jagged_path)
    dump_json(ips_jagged_obj, ips_jagged_path)
    dump_json(failed_obj, failed_users_path)
    
    # --- Console peeks ---
    #print("\n=== Wrote JSON files ===")
    #print("summary         :", summary_path)
    #print("users_jagged    :", users_jagged_path)
    #print("ips_jagged      :", ips_jagged_path)
    #print("failed_users    :", failed_users_path)

    print("\n=== Small samples ===")
    print("users_dict sample:", json.dumps(dict(list(users_dict.items())[:3]), indent=2))
    print("ips_dict sample  :", json.dumps(dict(list(ips_dict.items())[:3]), indent=2))
    print("\nsummary.json preview:\n", summary_json[:800], "...\n")

    # --- Failure stats (for context) ---
    total_starts = int(df[NUM_STARTS_COL].sum()) if NUM_STARTS_COL in df else 0
    total_completions = int(df[NUM_COMPLETIONS_COL].astype("int").sum()) if NUM_COMPLETIONS_COL in df else 0
    n_job_failures = total_starts - total_completions
    job_failure_frac = (n_job_failures / total_starts) if total_starts else 0.0
    print(f"Job failure fraction %: {job_failure_frac:.3%}, job failure abs number: {n_job_failures}") #Job failures = (Jobs Started - Jobs finished)/ Jobs Started


=== Small samples ===
users_dict sample: {
  "uboonepro@fnal.gov": {
    "id": "UR1",
    "count": 99239,
    "valid": true
  },
  "icaruspro@fnal.gov": {
    "id": "UR2",
    "count": 47080,
    "valid": true
  },
  "gputnam@fnal.gov": {
    "id": "UR3",
    "count": 12693,
    "valid": true
  }
}
ips_dict sample  : {
  "131.225.240.146": {
    "id": "IP1",
    "count": 86225,
    "valid": true
  },
  "131.225.240.90": {
    "id": "IP2",
    "count": 47080,
    "valid": true
  },
  "131.225.240.140": {
    "id": "IP3",
    "count": 12693,
    "valid": true
  }
}

summary.json preview:
 {
  "users": [
    [
      "UR1",
      99239,
      true
    ],
    [
      "UR2",
      47080,
      true
    ],
    [
      "UR3",
      12693,
      true
    ],
    [
      "UR4",
      3652,
      true
    ],
    [
      "UR5",
      15298,
      true
    ],
    [
      "UR6",
      621,
      true
    ],
    [
      "UR7",
      28340,
      true
    ],
    [
      "UR8",
      1508,
      true
 

#### Main ####