
# jq-style Pipelines with plumbum

This notebook demonstrates how to use `pdum.plumbum.jq` to recreate jq-style transformations in Python. We start with simple pipelines and then mirror each of the 43 scenarios from `docs/jq-fu-43-examples.md`.


In [None]:
from __future__ import annotations

import base64
import json
import re
from datetime import datetime, timezone
from itertools import product
from urllib.parse import quote_plus

from pdum.plumbum import pb
from pdum.plumbum.iterops import chain, dedup, select, where
from pdum.plumbum.jq import (
    coalesce,
    delete_path,
    explode,
    field,
    iter_paths,
    resolve_path,
    set_path,
    transform,
    walk_tree,
)
from pdum.plumbum.jq.typing import Field, Index


## Helper utilities


In [None]:
def path_tokens(path):
    return [Field(p) if isinstance(p, str) else Index(p) for p in path]


def drop_empty(tree):
    result = tree
    changed = True
    while changed:
        changed = False
        for path, value in list(walk_tree(result)):
            if not path:
                continue
            if value in (None, "", []) or value == {}:
                result = delete_path(result, path_tokens(path))
                changed = True
                break
    return result


def remove_keys(tree, names):
    result = tree
    for path, _ in list(walk_tree(result)):
        if path and isinstance(path[-1], str) and path[-1] in names:
            result = delete_path(result, path_tokens(path))
    return result


def stringify_non_ascii(tree):
    result = tree
    for path, value in list(walk_tree(result)):
        if path and isinstance(value, str) and any(ord(ch) > 127 for ch in value):
            result = set_path(result, path_tokens(path), json.dumps(value))
    return result


def mask_secret_like(tree):
    pattern = re.compile(r"secret|token|password", re.IGNORECASE)
    result = tree
    for path, _ in list(walk_tree(result)):
        if path and isinstance(path[-1], str) and pattern.search(path[-1]):
            result = set_path(result, path_tokens(path), "***")
    return result


def leaf_paths(tree):
    return [
        {"path": list(path), "value": value} for path, value in walk_tree(tree) if not isinstance(value, (dict, list))
    ]


## Warm-up


In [None]:
users = [
    {"id": 1, "name": "Ada", "scores": [10, 15]},
    {"id": 2, "name": "Linus", "scores": [20]},
]

names = users > (select(field("name")) | pb(list))
score_sum = sum(list(users > explode("[].scores")))
names, score_sum

## Example 1 — Tag child items with parent fields

`jq`
```jq
.[] | .name as $n | .group as $g | .items[] | {name:$n, group:$g, item:.}
```


In [None]:
groups = [
    {"name": "alpha", "group": "A", "items": ["h1", "h2"]},
    {"name": "beta", "group": "B", "items": ["h3"]},
]

tagged = groups > (
    explode("[]")
    | select(
        lambda parent: [{"name": parent["name"], "group": parent["group"], "item": item} for item in parent["items"]]
    )
    | chain
    | pb(list)
)
tagged

## Example 2 — Keep array index when exploding

`jq`
```jq
.items | to_entries[] | {idx:.index, item:.value}
```


In [None]:
catalog = {"items": ["gala", "fuji", "braeburn"]}
items = list(catalog > explode("items"))
[{"idx": idx, "item": item} for idx, item in enumerate(items)]

## Example 3 — Flatten nested arrays with lineage

`jq`
```jq
.[] | .id as $pid | .buckets[] as $b | $b.items[] | {parent:$pid, bucket:$b.name, item:.}
```


In [None]:
projects = [
    {
        "id": "p-1",
        "buckets": [
            {"name": "red", "items": ["r1", "r2"]},
            {"name": "blue", "items": ["b1"]},
        ],
    },
    {"id": "p-2", "buckets": [{"name": "green", "items": ["g1", "g2"]}]},
]

flattened = projects > (
    explode("[]")
    | select(
        lambda proj: [
            {"parent": proj["id"], "bucket": bucket["name"], "item": item}
            for bucket in proj["buckets"]
            for item in bucket["items"]
        ]
    )
    | chain
    | pb(list)
)
flattened

## Example 4 — Explode object fields to key/value records

`jq`
```jq
.props | to_entries[] | {key:.key, value:.value}
```


In [None]:
record = {"props": {"cpu": "m2", "ram": "32gb"}}
[{"key": path[-1], "value": value} for path, value in iter_paths(record, "props.*")]

## Example 5 — Multi-condition filter with defaults

`jq`
```jq
select((.status // "unknown") == "ok" and (.lat? // 0) != 0)
```


In [None]:
readings = [
    {"id": 1, "status": "ok", "lat": 51.5},
    {"id": 2, "lat": 0.0},
    {"id": 3, "status": "ok", "lat": 0},
    {"id": 4, "status": None, "lat": 48.1},
]

filtered = readings > (
    where(lambda row: (row > coalesce("status", default="unknown")) == "ok" and (row > coalesce("lat", default=0)) != 0)
    | pb(list)
)
filtered

## Example 6 — Drop null/empty fields recursively

`jq`
```jq
walk(if type=="object" then with_entries(select(.value!=null and .value!=[] and .value!={})) else . end)
```


In [None]:
raw = {
    "meta": {"source": "ingest", "tags": [], "notes": None},
    "items": [
        {"id": 1, "extra": {}},
        {"id": 2, "extra": {"comment": "ok"}},
    ],
    "misc": "",
}

drop_empty(raw)

## Example 7 — Set defaults

`jq`
```jq
.price = (.price // 0) | .tags = (.tags // [])
```


In [None]:
catalog_entry = {"sku": "A-100", "tags": None}
{
    **catalog_entry,
    "price": catalog_entry > coalesce("price", default=0),
    "tags": catalog_entry > coalesce("tags", default=[]),
}

## Example 8 — Remove noisy keys anywhere

`jq`
```jq
walk(if type=="object" then del(.debug,.temp,.trace) else . end)
```


In [None]:
payload = {
    "debug": {"state": "verbose"},
    "data": {"value": 3, "trace": "abc", "nested": {"temp": 42, "value": 9}},
}

remove_keys(payload, {"debug", "temp", "trace"})

## Example 9 — Parse with regex groups

`jq`
```jq
.label | capture("^(?<cat>[A-Z]{2})-(?<id>\d+)$") | {category:.cat, id:(.id|tonumber)}
```


In [None]:
labelled = {"label": "IN-204"}
match_obj = re.match(r"^(?P<cat>[A-Z]{2})-(?P<id>\d+)$", labelled > field("label"))
{"category": match_obj.group("cat"), "id": int(match_obj.group("id"))}

## Example 10 — Rewrite path prefix

`jq`
```jq
.path | gsub("^/api/v\d+/"; "/api/latest/")
```


In [None]:
endpoint = {"path": "/api/v2/accounts/42"}
endpoint > transform("path", lambda value: re.sub(r"^/api/v\d+/", "/api/latest/", value))

## Example 11 — Build interpolated strings

`jq`
```jq
{slug:"\(.category)/\(.id)", title:(.title|tostring)}
```


In [None]:
article = {"category": "blog", "id": 42, "title": 123}
{
    "slug": f"{article['category']}/{article['id']}",
    "title": str(article["title"]),
}

## Example 12 — Safe numeric coercion

`jq`
```jq
.price = ((.price? // 0) | tonumber) | .qty = ((.qty? // 1) | tonumber)
```


In [None]:
row = {"price": "12.5", "qty": None}
{
    "price": float(row > coalesce("price", default=0)),
    "qty": int(row > coalesce("qty", default=1)),
}

## Example 13 — Round to 2 decimals

`jq`
```jq
.total = ((.total // 0) * 100 | round / 100)
```


In [None]:
invoice = {"total": 123.4567}
invoice > transform("total", lambda value: round(((value or 0) * 100)) / 100)

## Example 14 — ISO8601 ↔ epoch

`jq`
```jq
.ts = (.timestamp|fromdateiso8601) | .ts_human = (.ts|todateiso8601)
```


In [None]:
event = {"timestamp": "2024-01-05T12:30:00Z"}
instant = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
{
    **event,
    "ts": int(instant.replace(tzinfo=timezone.utc).timestamp()),
    "ts_human": instant.isoformat(),
}

## Example 15 — Minute buckets

`jq`
```jq
. | (.timestamp|fromdateiso8601/60|floor) as $m | {minute:$m, event:.}
```


In [None]:
raw_events = [
    {"timestamp": "2024-02-01T10:00:05Z", "event": "login"},
    {"timestamp": "2024-02-01T10:00:45Z", "event": "heartbeat"},
    {"timestamp": "2024-02-01T10:01:10Z", "event": "logout"},
]


def minute_bucket(row):
    dt = datetime.fromisoformat(row["timestamp"].replace("Z", "+00:00"))
    minute = int(dt.replace(tzinfo=timezone.utc).timestamp() // 60)
    return {"minute": minute, "event": row}


[minute_bucket(row) for row in raw_events]

## Example 16 — Merge parent keys into child

`jq`
```jq
.name as $n | .region as $r | .items[] | . + {name:$n, region:$r}
```


In [None]:
regions = [
    {"name": "alpha", "region": "EMEA", "items": [{"id": 1}, {"id": 2}]},
    {"name": "beta", "region": "NA", "items": [{"id": 3}]},
]

merged = regions > (
    explode("[]")
    | select(lambda parent: [{**item, "name": parent["name"], "region": parent["region"]} for item in parent["items"]])
    | chain
    | pb(list)
)
merged

## Example 17 — Deep patch

`jq`
```jq
setpath(["meta","source"];"ingest-1")
```


In [None]:
doc = {"meta": {"source": "ingest-0"}}
set_path(doc, "meta.source", "ingest-1")

## Example 18 — Multiply each price

`jq`
```jq
.items[] | .price |= (. * 1.05)
```


In [None]:
order = {"items": [{"sku": "a", "price": 10.0}, {"sku": "b", "price": 12.0}]}
transform(order, "items[].price", lambda price: round(price * 1.05, 2))

## Example 19 — Stateful dedupe by key

`jq`
```jq
reduce inputs as $x ({}; ...)
```


In [None]:
stream = [
    {"id": 1, "value": "first"},
    {"id": 1, "value": "duplicate"},
    {"id": 2, "value": "second"},
]
list(stream > (dedup(lambda row: row["id"]) | pb(list)))

## Example 20 — Running totals

`jq`
```jq
foreach inputs as $r (0; . + ($r.value // 0); {ts:$r.ts, running:.})
```


In [None]:
records = [
    {"ts": "00:00", "value": 5},
    {"ts": "00:01", "value": 3},
    {"ts": "00:02", "value": None},
    {"ts": "00:03", "value": 2},
]

total = 0
running = []
for row in records:
    total += row.get("value") or 0
    running.append({"ts": row["ts"], "running": total})

running

## Example 21 — Windowed aggregation when key changes

`jq`
```jq
label $out | foreach inputs as $x (...)
```


In [None]:
rows = [
    {"user": "a", "value": 2},
    {"user": "a", "value": 3},
    {"user": "b", "value": 4},
    {"user": "b", "value": 1},
]

windows = []
current_user = None
running = 0
for row in rows:
    if current_user is None or row["user"] == current_user:
        current_user = row["user"]
        running += row["value"]
    else:
        windows.append({"user": current_user, "sum": running})
        current_user = row["user"]
        running = row["value"]
windows.append({"user": current_user, "sum": running})
windows

## Example 22 — Join against side table

`jq`
```jq
INDEX($users[];.id) as $U | inputs | . + {user:($U[.user_id]//{})}
```


In [None]:
users_side = [
    {"id": 10, "name": "Ada"},
    {"id": 11, "name": "Linus"},
]
orders = [
    {"order": "o-1", "user_id": 10},
    {"order": "o-2", "user_id": 99},
]

directory = {user["id"]: user for user in users_side}
[{**order, "user": directory.get(order["user_id"], {})} for order in orders]

## Example 23 — Join and project single field

`jq`
```jq
INDEX($u[];.id) as $U | inputs | . + {user_name:($U[.user_id].name // "unknown")}
```


In [None]:
users_side = [
    {"id": 10, "name": "Ada"},
    {"id": 11, "name": "Linus"},
]
orders = [
    {"order": "o-1", "user_id": 10},
    {"order": "o-2", "user_id": 12},
]

directory = {user["id"]: user["name"] for user in users_side}
[{**order, "user_name": directory.get(order["user_id"], "unknown")} for order in orders]

## Example 24 — Rename keys dynamically

`jq`
```jq
with_entries(.key |= (if .=="oldName" ...))
```


In [None]:
record = {"oldName": "value", "x_version": "1", "other": 10}
renamed = {}
for key, value in record.items():
    if key == "oldName":
        renamed["newName"] = value
    elif key.startswith("x_"):
        renamed[key[2:]] = value
    else:
        renamed[key] = value
renamed

## Example 25 — Promote nested key

`jq`
```jq
if has("meta") and .meta|has("id") then ...
```


In [None]:
record = {"id": 1, "meta": {"id": "m-1", "other": "x"}}
if "meta" in record and "id" in record["meta"]:
    promoted = {**record, "meta_id": record["meta"]["id"]}
    promoted["meta"] = {k: v for k, v in record["meta"].items() if k != "id"}
else:
    promoted = record
promoted

## Example 26 — Emit leaf paths

`jq`
```jq
paths(scalars) as $p | {path:$p, value:(getpath($p))}
```


In [None]:
structure = {"a": 1, "b": {"c": 2, "d": [3, 4]}}
leaf_paths(structure)

## Example 27 — try/catch fallback

`jq`
```jq
try (.number|tonumber) catch 0
```


In [None]:
values_to_coerce = [{"number": "10"}, {"number": "bad"}, {}]


def safe_int(row):
    try:
        return int(row["number"])
    except (KeyError, TypeError, ValueError):
        return 0


[safe_int(row) for row in values_to_coerce]

## Example 28 — Validate records

`jq`
```jq
select(.id? and (.email?|test(".+@.+\..+")))
```


In [None]:
rows = [
    {"id": 1, "email": "a@example.com"},
    {"id": 2, "email": "bad"},
    {"email": "missing"},
]
valid = rows > (where(lambda row: "id" in row and bool(re.match(r".+@.+\..+", row.get("email", "")))) | pb(list))
valid

## Example 29 — Assert invariant

`jq`
```jq
. as $o | if ($o.qty // 0)>=0 then $o else error("negative qty") end
```


In [None]:
def enforce_quantity(row):
    qty = row.get("qty", 0) or 0
    if qty < 0:
        raise ValueError("negative qty")
    return row


enforce_quantity({"qty": 2}), enforce_quantity({"qty": 0})

## Example 30 — Stable hash of fields

`jq`
```jq
{sku,qty}|tojson|@base64
```


In [None]:
item = {"sku": "sku-1", "qty": 3}
base64.b64encode(json.dumps({"sku": item["sku"], "qty": item["qty"]}, sort_keys=True).encode()).decode()

## Example 31 — URL/CSV encode

`jq`
```jq
{url:"https://x/?q"+(.q|@uri), csv:([.a,.b,.c]|@csv)}
```


In [None]:
row = {"q": "data science", "a": "x", "b": "y", "c": "z"}
url = f"https://x/?q={quote_plus(row['q'])}"
csv_line = ",".join([row["a"], row["b"], row["c"]])
{"url": url, "csv": csv_line}

## Example 32 — Parse kv logs

`jq`
```jq
-Rn (input|split(" " )|map(split(=))|from_entries) ...
```


In [None]:
log_line = "ip=1.1.1.1 status=200 t=45ms"
parsed = {}
for part in log_line.split():
    key, value = part.split("=", 1)
    parsed[key] = value
{
    "ip": parsed.get("ip"),
    "status": int(parsed.get("status", 0)),
    "ms": int(parsed.get("t", "0ms").replace("ms", "")),
}

## Example 33 — Extract timestamps anywhere

`jq`
```jq
.. | objects | select(has("timestamp")) | .timestamp
```


In [None]:
structure = {
    "root": [
        {"timestamp": "2024-01-01T00:00:00Z"},
        {"nested": {"timestamp": "2024-01-02T00:00:00Z"}},
    ]
}
[match for match in resolve_path(structure, "..timestamp")]

## Example 34 — Cartesian product generator

`jq`
```jq
-n '[1,2,3] as $a | ["x","y"] as $b | $a[] as $i | $b[] as $j | {i:$i,j:$j}'
```


In [None]:
[{"i": i, "j": j} for i, j in product([1, 2, 3], ["x", "y"])]

## Example 35 — Params from shell args

`jq`
```jq
-n --arg user "$USER" --argjson cfg '{"k":1}' '{run_by:$user, cfg:$cfg}'
```


In [None]:
user = "ada"
cfg = {"k": 1}
{"run_by": user, "cfg": cfg}

## Example 36 — Branch + recombine

`jq`
```jq
. as $row | {id:$row.id} | . + {extended:($row.a+$row.b)}
```


In [None]:
row = {"id": 100, "a": 2, "b": 3}
{"id": row["id"], "extended": row["a"] + row["b"]}

## Example 37 — Switch by type

`jq`
```jq
type as $t | {type:$t, value:(if $t=="number" then . * 2 elif $t=="string" then .+"!" else . end)}
```


In [None]:
def transform_value(value):
    kind = type(value)
    if kind in (int, float):
        return {"type": "number", "value": value * 2}
    if kind is str:
        return {"type": "string", "value": value + "!"}
    return {"type": kind.__name__, "value": value}


[transform_value(v) for v in [10, "hi", [1, 2]]]

## Example 38 — Stringify non-ASCII

`jq`
```jq
walk(if type=="string" and (.[0:]|test("[^\u0000-\u007F]")) then @json else . end)
```


In [None]:
strings = {"english": "hello", "japanese": "こんにちは"}
stringify_non_ascii(strings)

## Example 39 — Mask secrets

`jq`
```jq
walk(if type=="object" then with_entries(if (.key|test("(?i)secret|token|password")) then .value="***" else . end) else . end)
```


In [None]:
secrets = {"token": "abcd", "nested": {"apiSecret": "123", "visible": "ok"}}
mask_secret_like(secrets)

## Example 40 — NDJSON → CSV rows

`jq`
```jq
[.id, .user, .total] | @csv
```


In [None]:
record = {"id": 1, "user": "ada", "total": 42.5}
",".join([str(record["id"]), record["user"], str(record["total"])])

## Example 41 — TSV with defaults

`jq`
```jq
[.id, (.name // ""), (.meta.version // "")] | @tsv
```


In [None]:
order = {"id": 1, "meta": {"version": "v1"}}
"	".join(
    [
        str(order.get("id", "")),
        order.get("name", ""),
        order.get("meta", {}).get("version", ""),
    ]
)

## Example 42 — Propagate parent field to all nested objects

`jq`
```jq
.name as $n | .. | objects | . + {parent_name:$n}
```


In [None]:
record = {"name": "root", "children": [{"value": 1}, {"value": 2}]}
[{**child, "parent_name": record["name"]} for child in record["children"]]

## Example 43 — Conditionally explode arrays

`jq`
```jq
if (.items?|type)=="array" then .items[] | . + {parent:.name} else . end
```


In [None]:
rows = [
    {"name": "alpha", "items": [{"id": 1}, {"id": 2}]},
    {"name": "beta", "items": None},
]

result = []
for row in rows:
    items = row.get("items")
    if isinstance(items, list):
        result.extend({**item, "parent": row["name"]} for item in items)
    else:
        result.append(row)
result