### We have two datasets:
#### 1) Linkedin jobs ads (dynamic)
#### 2) ECSF (static)

The first dataset, has to be dynamic in other words, we need to put it in a producer-topic-consumer-casssandra

The second dataset, has to be in cassandra

### To clean the linkedin job ads dataset:

#### 1) delete irrelevent columns
#### 2) remove duplicates
#### 3) remove null values in the 
#### 4) preprocess skills column
#### 5) preprocess description column
#### 6) remove stop words


### To use the static dataset ECSF we need to prepare it to be stored in Cassandra
- Loads JSON
- Validates referential integrity
- Normalizes text (smart quotes, whitespace)
- Canonicalizes TKS IDs and work_role IDs
- Builds denormalized tables:
1) work_role_by_id (main role data with tks_ids list)
2) role_with_tks (embedded small tks tuples)
3) roles_by_title (title/alt_title -> main role id mapping)
4) roles_by_tks (reverse lookup tks_id -> work_role_id)
5) tks_by_id (clean tks table)
- Saves CSVs for bulk load and prints minimal CQL DDL + prepared statement examples.
- Displays the DataFrames to the user.

In [1]:
import json, os, re, datetime, csv
from pathlib import Path
import pandas as pd

# ---------- 1. Load JSON (or create a sample if missing) ----------
INPUT_PATH = Path("ecsf.json")

with open(INPUT_PATH, "r", encoding="utf-8") as f:
    raw = json.load(f)

# ---------- 2. Basic validation & normalization functions ----------
def normalize_text(s):
    if s is None:
        return s
    # replace smart quotes / unicode apostrophes with ASCII apostrophe, trim whitespace
    s = s.replace("\u2019", "'").replace("\u201c", "\"").replace("\u201d", "\"")
    s = s.strip()
    # collapse multiple spaces/newlines
    s = re.sub(r"\s+", " ", s)
    return s

def canonicalize_tks_id(tks_id):
    if tks_id is None:
        return None
    # uppercase, remove non-alphanum then reformat: letter + zero-padded number to 4 digits
    m = re.match(r"([A-Za-z])\s*0*?(\d+)$", re.sub(r"[^A-Za-z0-9]", "", tks_id))
    if m:
        letter = m.group(1).upper()
        num = int(m.group(2))
        return f"{letter}{num:04d}"
    # fallback: uppercase full string (safe)
    return tks_id.upper()

def canonicalize_role_id(rid):
    # allow either integer or uuid-like strings. For simplicity keep ints as-is;
    # if non-int, we'll normalize to a string version.
    try:
        return int(rid)
    except Exception:
        return str(rid)

# Validate presence of main keys
for key in ("work_role", "tks", "relationship"):
    if key not in raw:
        raise ValueError(f"Missing top-level key: {key}")

work_roles = raw["work_role"]
tks = raw["tks"]
relationships = raw["relationship"]

# ---------- 3. Clean & canonicalize tables ----------
# Clean TKS table
clean_tks = []
seen_tks = {}
for entry in tks:
    cid = canonicalize_tks_id(entry.get("id"))
    typ = normalize_text(entry.get("type"))
    desc = normalize_text(entry.get("description"))
    if cid in seen_tks:
        # dedupe by id (keep first)
        continue
    seen_tks[cid] = True
    clean_tks.append({"tks_id": cid, "type": typ, "description": desc})
tks_df = pd.DataFrame(clean_tks)

# Clean work roles
clean_roles = []
seen_titles = set()
for entry in work_roles:
    rid = canonicalize_role_id(entry.get("id"))
    title = normalize_text(entry.get("title"))
    alt_titles = entry.get("alternative_title(s)") or []
    alt_titles = [normalize_text(x) for x in alt_titles if x and str(x).strip()]
    summary = normalize_text(entry.get("summary_statement"))
    mission = normalize_text(entry.get("mission"))
    if title in seen_titles:
        # simple dedupe by title
        continue
    seen_titles.add(title)
    clean_roles.append({
        "work_role_id": rid,
        "title": title,
        "alt_titles": alt_titles,
        "summary_statement": summary,
        "mission": mission
    })
roles_df = pd.DataFrame(clean_roles)

# Clean relationships and check referential integrity
clean_rel = []
missing_tks = set()
missing_roles = set()
for rel in relationships:
    rid = canonicalize_role_id(rel.get("work_role_id"))
    tid = canonicalize_tks_id(rel.get("tks_id"))
    # check existence
    if not any(row["work_role_id"] == rid for row in clean_roles):
        missing_roles.add(rid)
    if not any(row["tks_id"] == tid for row in clean_tks):
        missing_tks.add(tid)
    clean_rel.append({"work_role_id": rid, "tks_id": tid})

rel_df = pd.DataFrame(clean_rel)

# Report referential integrity issues (if any)
report = []
if missing_roles:
    report.append(f"Missing role ids referenced in relationships: {sorted(list(missing_roles))}")
if missing_tks:
    report.append(f"Missing tks ids referenced in relationships: {sorted(list(missing_tks))}")

# ---------- 4. Build denormalized payloads for Cassandra access patterns ----------
# 1) work_role_by_id: main role row with tks_ids list
# 2) role_with_tks: embedded small tuples of (tks_id, type, description)
# 3) roles_by_title: map every title AND alt_title -> main role id (so alt titles are first-class)
# 4) roles_by_tks: reverse lookup tks_id -> work_role_id
# 5) tks_by_id: cleaned tks table

# Build mapping from tks_id -> details
tks_map = {r["tks_id"]: {"type": r["type"], "description": r["description"]} for r in clean_tks}

# work_role_by_id and role_with_tks
roles_rows = []
role_with_tks_rows = []
for role in clean_roles:
    rid = role["work_role_id"]
    # find tks ids for this role
    tks_ids = [r["tks_id"] for r in clean_rel if r["work_role_id"] == rid]
    # build embedded tks tuples (small) - include only present ones
    embedded = []
    for tid in tks_ids:
        details = tks_map.get(tid)
        if details:
            embedded.append((tid, details["type"], details["description"]))
    roles_rows.append({
        "work_role_id": rid,
        "title": role["title"],
        "alt_titles": role["alt_titles"],
        "summary_statement": role["summary_statement"],
        "mission": role["mission"],
        "tks_ids": tks_ids,
        "ingested_at": datetime.datetime.utcnow().isoformat()
    })
    role_with_tks_rows.append({
        "work_role_id": rid,
        "title": role["title"],
        "alt_titles": role["alt_titles"],
        "summary_statement": role["summary_statement"],
        "mission": role["mission"],
        "tks": embedded
    })

work_role_by_id_df = pd.DataFrame(roles_rows)
role_with_tks_df = pd.DataFrame(role_with_tks_rows)

# roles_by_title: both title and each alt_title map to the main role id
roles_by_title_rows = []
for role in clean_roles:
    rid = role["work_role_id"]
    main_title = role["title"]
    roles_by_title_rows.append({"title_key": main_title.lower(), "work_role_id": rid, "is_alt": False, "canonical_title": main_title})
    for alt in role["alt_titles"]:
        roles_by_title_rows.append({"title_key": alt.lower(), "work_role_id": rid, "is_alt": True, "canonical_title": main_title})
roles_by_title_df = pd.DataFrame(roles_by_title_rows)

# roles_by_tks: reverse lookup
roles_by_tks_rows = []
for rel in clean_rel:
    roles_by_tks_rows.append({"tks_id": rel["tks_id"], "work_role_id": rel["work_role_id"]})
roles_by_tks_df = pd.DataFrame(roles_by_tks_rows)

# tks_by_id DataFrame
tks_by_id_df = tks_df.rename(columns={"tks_id":"tks_id","type":"type","description":"description"})

# Save CSVs for potential COPY bulk load or inspection
out_dir = Path("ecsf_cleaned")
out_dir.mkdir(parents=True, exist_ok=True)
work_role_by_id_df.to_csv(out_dir / "work_role_by_id.csv", index=False)
role_with_tks_df.to_csv(out_dir / "role_with_tks.csv", index=False, quoting=csv.QUOTE_MINIMAL)
roles_by_title_df.to_csv(out_dir / "roles_by_title.csv", index=False)
roles_by_tks_df.to_csv(out_dir / "roles_by_tks.csv", index=False)
tks_by_id_df.to_csv(out_dir / "tks_by_id.csv", index=False)

# ---------- 5. Display small previews to the user ----------
print("work_role_by_id (preview)", work_role_by_id_df.head(50))
print("role_with_tks (preview)", role_with_tks_df.head(50))
print("roles_by_title (preview)", roles_by_title_df.head(200))
print("roles_by_tks (preview)", roles_by_tks_df.head(200))
print("tks_by_id (preview)", tks_by_id_df.head(200))

print("Saved cleaned CSVs to:", out_dir)

# ---------- 7. Integrity report ----------
if report:
    print("\nIntegrity issues found:")
    for r in report:
        print(" -", r)
else:
    print("\nNo referential integrity issues detected. All good!")

# Also list saved file paths for download
print("\nSaved files:")
for p in out_dir.iterdir():
    print(" -", p)

work_role_by_id (preview)     work_role_id                                      title  \
0              1  Chief Information Security Officer (CISO)   
1              2                   Cyber Incident Responder   
2              3   Cyber Legal, Policy & Compliance Officer   
3              4       Cyber Threat Intelligence Specialist   
4              5                    Cybersecurity Architect   
5              6                      Cybersecurity Auditor   
6              7                     Cybersecurity Educator   
7              8                  Cybersecurity Implementer   
8              9                   Cybersecurity Researcher   
9             10                 Cybersecurity Risk Manager   
10            11             Digital Forensics Investigator   
11            12                         Penetration Tester   

                                           alt_titles  \
0   [Cybersecurity Programme Director, Information...   
1   [Cyber Incident Handler, Cyber Crisi

In [None]:
# ---------- 6. Print minimal CQL DDL + prepared statement examples (user-level) ----------
cql_ddl = f"""
-- Keyspace:
CREATE KEYSPACE IF NOT EXISTS ecsf WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}};

-- 1) work_role_by_id: primary storage
CREATE TABLE IF NOT EXISTS work_role_by_id (
  work_role_id int PRIMARY KEY,
  title text,
  alt_titles list<text>,
  summary_statement text,
  mission text,
  tks_ids list<text>,
  metadata map<text,text>
);

-- 2) role_with_tks: denormalized payload for fast reads (embedded small tks tuples)
CREATE TABLE IF NOT EXISTS role_with_tks (
  work_role_id int PRIMARY KEY,
  title text,
  alt_titles list<text>,
  summary_statement text,
  mission text,
  tks list<frozen<tuple<text,text,text>>>
);

-- 3) roles_by_title: support lookup by title or alt title (title_key lowercased)
CREATE TABLE IF NOT EXISTS roles_by_title (
  title_key text PRIMARY KEY, -- store lowercase title for case-insensitive lookup
  work_role_id int,
  is_alt boolean,
  canonical_title text
);

-- 4) roles_by_tks: reverse lookup (which roles use a given TKS)
CREATE TABLE IF NOT EXISTS roles_by_tks (
  tks_id text,
  work_role_id int,
  PRIMARY KEY (tks_id, work_role_id)
);

-- 5) tks_by_id:
CREATE TABLE IF NOT EXISTS tks_by_id (
  tks_id text PRIMARY KEY,
  type text,
  description text
);
"""

prepared_examples = """
-- Example prepared statements (pseudo-CQL)
-- Insert into primary role table
INSERT INTO work_role_by_id (work_role_id, title, alt_titles, summary_statement, mission, tks_ids, metadata) VALUES (?, ?, ?, ?, ?, ?, ?);

-- Insert into denormalized role_with_tks
INSERT INTO role_with_tks (work_role_id, title, alt_titles, summary_statement, mission, tks) VALUES (?, ?, ?, ?, ?, ?);

-- Insert into roles_by_title for each title & alt title (title_key should be lowercase)
INSERT INTO roles_by_title (title_key, work_role_id, is_alt, canonical_title) VALUES (?, ?, ?, ?);

-- Insert into roles_by_tks for reverse lookup
INSERT INTO roles_by_tks (tks_id, work_role_id) VALUES (?, ?);

-- Insert into tks_by_id
INSERT INTO tks_by_id (tks_id, type, description) VALUES (?, ?, ?);
"""

print("\n=== CQL DDL to create tables ===\n")
print(cql_ddl)
print("\n=== Example prepared statements ===\n")
print(prepared_examples)
