## Installing dependencies and setting up

In [None]:
#!/usr/bin/env python3
# Streamed Drop / Cap / Keep with Polars
import os, re, sys
import polars as pl

# --------------------------
# Your rules
# --------------------------
DROP_LABELS = {
    "DDoS_ICMP",
    "DDoS_UDP",
    "DDoS_TCP",

}
CAP_MAP = {
    "DDoS-PSHACK_FLOOD":        1_000_000,
    "DDoS-RSTFINFLOOD":         1_000_000,
    "DDoS-SYNONYMOUSIP_FLOOD":  1_000_000,
    "DoS_UDP":                    295_501,
    "DoS_TCP":                    537_521,
    "DoS_SYN":                    459_512,
    "DDoS_SYN":                     25_649,

}
SEED = 42  # deterministic subset via hash

# --------------------------
# IO
# --------------------------
folder = os.getcwd()
pattern = os.path.join(folder, "*_clean.parquet")

kept_unlimited_path = os.path.join(folder, "kept_unlimited.parquet")
final_merged_path   = os.path.join(folder, "dataset_filtered_after_dropped_nan.parquet")

def sanitize(name: str) -> str:
    return re.sub(r"[^A-Za-z0-9._-]+", "_", name)

# --------------------------
# Scan + add stable row id
# --------------------------
if not list(filter(lambda p: p.lower().endswith("_clean.parquet"), os.listdir(folder))):
    sys.exit("No *_clean.parquet files found in this folder.")

lazy_all = (
    pl.scan_parquet(pattern)
      .with_row_index("rid")                 # stable within this concatenated scan
      .with_columns(pl.col("Label").cast(pl.Utf8))
)

# --------------------------
# KEEP-ALL (not dropped, not capped)
# --------------------------
print("▶ Writing kept (unlimited) labels to:", kept_unlimited_path)
keep_all_query = (
    lazy_all
    .filter(
        (~pl.col("Label").is_in(list(DROP_LABELS))) &
        (~pl.col("Label").is_in(list(CAP_MAP.keys())))
    )
)
keep_all_query.sink_parquet(
    kept_unlimited_path,
    compression="zstd",
    compression_level=1,
    statistics=False,
    maintain_order=False,
)

# --------------------------
# CAPPED labels (deterministic hash on rid)
# --------------------------
cap_parts = []
for label, cap in CAP_MAP.items():
    part_path = os.path.join(folder, f"cap_{sanitize(label)}.parquet")
    cap_parts.append(part_path)

    print(f"▶ Selecting {cap:,} rows for capped label: {label}")
    # (a) collect ONLY rids sorted by hash → deterministic uniform subset
    rids_df = (
        lazy_all
        .filter(pl.col("Label") == label)
        .select([
            pl.col("rid"),
            pl.col("rid").hash(seed=SEED).alias("h"),
        ])
        .sort("h")
        .limit(cap)
        .select("rid")
        .collect()  # tiny (just rids)
    )

    if rids_df.height == 0:
        print(f"  (warning) No rows found for '{label}'. Skipping.")
        continue

    print(f"  → Collected {rids_df.height:,} rids. Writing rows to:", part_path)
    (
        lazy_all
        .join(rids_df.lazy(), on="rid", how="semi")
        .sink_parquet(
            part_path,
            compression="zstd",
            compression_level=1,
            statistics=False,
            maintain_order=False,
        )
    )

# --------------------------
# FINAL MERGE (kept + capped), drop the DROP labels defensively
# --------------------------
print("▶ Merging parts into:", final_merged_path)
sources = [kept_unlimited_path] + [p for p in cap_parts if os.path.exists(p)]
if not sources:
    sys.exit("Nothing to merge — no outputs were created.")

merged = pl.concat([pl.scan_parquet(p) for p in sources], how="diagonal_relaxed")
merged = merged.filter(~pl.col("Label").is_in(list(DROP_LABELS)))

merged.sink_parquet(
    final_merged_path,
    compression="zstd",
    compression_level=1,
    statistics=False,
    maintain_order=False,
)

# --------------------------
# Quick report
# --------------------------
print("\n✅ Done.")
print("Parts written:")
print("  •", kept_unlimited_path)
for p in cap_parts:
    if os.path.exists(p):
        print("  •", p)
print("Final merged:", final_merged_path)

try:
    final_counts = (
        pl.scan_parquet(final_merged_path)
          .group_by("Label")
          .len()
          .sort("len", descending=True)
          .collect()
    )
    total_rows = int(final_counts["len"].sum())
    print(f"\nFinal total rows: {total_rows:,}")
    print(final_counts)
except Exception as e:
    print("Note: could not print final counts (skipping). Reason:", e)


In [None]:
#!/usr/bin/env python3
import os
import polars as pl

folder = os.getcwd()
pattern = os.path.join(folder, "*_nan.parquet")
out_path = os.path.join(folder, "label_counts_after_dropped_nan_and_cap.txt")

# Scan, then select only the Label column (projection pushdown will keep it fast).
q = (
    pl.scan_parquet(pattern)
      .select(pl.col("Label").cast(pl.Categorical))
      .filter(pl.col("Label").is_not_null())
)

# Total rows (collect just a tiny scalar)
n_rows = (
    q.select(pl.len().alias("rows"))
     .collect(engine="streaming")["rows"][0]
)

# Label counts (materialize only the small result)
label_counts = (
    q.group_by("Label")
     .len()
     .sort("len", descending=True)
     .collect(engine="streaming")
     .rename({"len": "count"})
)

# Build text lines
lines = []
lines.append("Analyzing original Parquet files (glob): *_clean.parquet")
lines.append(f"Total rows: {n_rows}\n")
lines.append("Unique labels and their counts:")
for label, count in zip(label_counts["Label"].to_list(),
                        label_counts["count"].to_list()):
    lines.append(f"  {label}: {count}")
lines.append(f"\nTotal unique labels: {label_counts.shape[0]}")

# Write to txt
with open(out_path, "w", encoding="utf-8") as f:
    f.write("\n".join(lines))

print(f"Wrote summary to: {out_path}")


In [None]:
#!/usr/bin/env python3
import os
import polars as pl

folder = os.getcwd()
pattern = os.path.join(folder, "*_clean.parquet")

# Scan, then select only the Label column (projection pushdown will keep it fast).
q = (
    pl.scan_parquet(pattern)
      .select(pl.col("Label").cast(pl.Categorical))
      .filter(pl.col("Label").is_not_null())
)

# Total rows (collect just a tiny scalar)
n_rows = (
    q.select(pl.len().alias("rows"))
     .collect(engine="streaming")["rows"][0]
)

# Label counts (materialize only the small result)
label_counts = (
    q.group_by("Label")
     .len()
     .sort("len", descending=True)
     .collect(engine="streaming")
     .rename({"len": "count"})
)

print("Analyzing original Parquet files (glob): *_clean.parquet")
print(f"Total rows: {n_rows}\n")
print("Unique labels and their counts:")
for label, count in zip(label_counts["Label"].to_list(),
                        label_counts["count"].to_list()):
    print(f"  {label}: {count}")
print(f"\nTotal unique labels: {label_counts.shape[0]}")
