In [1]:
# ===== PhonePe Pulse — full workflow for: data/aggregated/<type>/country/india/state/<state>/<year>/<q>.json =====
from __future__ import annotations
from pathlib import Path
from typing import List, Dict, Optional, Iterable
import json, re, pandas as pd

# ---------- CONFIG ----------
DATA_ROOT = Path("data")  # change if your data folder is elsewhere
IGNORE_META = {"success", "code", "responseTimestamp", "from", "to"}
SECTION_CHOICES = ["aggregated", "map", "top"]
TYPE_CHOICES = ["transaction", "user", "insurance"]

# ---------- SMALL UTILS ----------
def _read_json(p: Path) -> Optional[dict]:
    try:
        with p.open("r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        return None

def _to_float(x):
    if isinstance(x, (int, float)): return float(x)
    if isinstance(x, str):
        try: return float(x.replace(",", ""))
        except: return None
    return None

def _q_from_name(p: Path) -> Optional[int]:
    try:
        q = int(p.stem);  return q if q in (1,2,3,4) else None
    except: return None

def _year_from_parent(p: Path) -> Optional[int]:
    try: return int(p.parent.name)
    except: return None

def _slug(s: str) -> str:
    if s is None: return ""
    return re.sub(r"-{2,}", "-", re.sub(r"[^a-z0-9]+", "-", str(s).lower())).strip("-")

# ---------- PATH PARSER (handles .../country/india/state/<state>/...) ----------
def _path_meta(p: Path) -> Dict[str, Optional[str]]:
    """
    Supports:
      data/aggregated/<type>/country/india/<year>/<q>.json
      data/aggregated/<type>/country/india/state/<state>/<year>/<q>.json
      data/map/<type>/hover/country/india/<year>/<q>.json
      data/top/<type>/country/india/<year>/<q>.json
    """
    parts = p.parts
    if "data" not in parts: return {}
    i = parts.index("data")
    comps = parts[i:]  # start at 'data'
    if len(comps) < 7: return {}

    d = {"section": comps[1], "dtype": comps[2], "map_kind": None, "geo_level": None, "geo_name": None}
    j = 3
    if d["section"] == "map" and comps[j] == "hover":
        d["map_kind"] = "hover"; j += 1

    # Expect "country/india/..."
    if comps[j] != "country": return {}
    j += 1
    d["geo_level"] = "country"
    if comps[j] != "india": return {}
    j += 1
    d["geo_name"] = "india"

    # Optional state path: ".../country/india/state/<state>/<year>/<q>.json"
    if j < len(comps) and comps[j] == "state":
        j += 1
        d["geo_level"] = "state"
        d["geo_name"]  = comps[j]; j += 1

    # year / quarter
    try:
        d["year"] = int(comps[j]); d["quarter"] = int(Path(comps[j+1]).stem)
    except Exception:
        d["year"] = _year_from_parent(p); d["quarter"] = _q_from_name(p)
    return d

# ---------- PARSERS ----------
def _parse_agg_txn(payload: dict) -> List[dict]:
    rows=[]
    for it in (payload.get("data", {}).get("transactionData") or []):
        cat = it.get("name")
        for pi in (it.get("paymentInstruments") or []):
            if pi.get("type") == "TOTAL":
                rows.append({"metric":"transactions","category":cat,
                             "count":_to_float(pi.get("count")), "amount":_to_float(pi.get("amount"))})
    return rows

def _parse_agg_ins(payload: dict) -> List[dict]:
    rows=[]
    for it in (payload.get("data", {}).get("transactionData") or []):
        for pi in (it.get("paymentInstruments") or []):
            if pi.get("type") == "TOTAL":
                rows.append({"metric":"insurance","category":it.get("name","Insurance"),
                             "count":_to_float(pi.get("count")), "amount":_to_float(pi.get("amount"))})
    return rows

def _parse_agg_user(payload: dict) -> List[dict]:
    rows=[]
    data = payload.get("data", {})
    agg  = data.get("aggregated") or {}
    if agg:
        rows.append({"metric":"users_summary",
                     "registeredUsers":_to_float(agg.get("registeredUsers")),
                     "appOpens":_to_float(agg.get("appOpens")),
                     "brand":None,"brand_count":None,"brand_pct":None})
    for d in (data.get("usersByDevice") or []):
        rows.append({"metric":"users_by_device",
                     "registeredUsers":None,"appOpens":None,
                     "brand":d.get("brand"), "brand_count":_to_float(d.get("count")),
                     "brand_pct":_to_float(d.get("percentage"))})
    return rows

def _parse_map_txn_ins(payload: dict) -> List[dict]:
    rows=[]
    for item in (payload.get("data", {}).get("hoverDataList") or []):
        nm = item.get("name")
        for m in (item.get("metric") or []):
            if m.get("type") == "TOTAL":
                rows.append({"name":nm,"count":_to_float(m.get("count")), "amount":_to_float(m.get("amount"))})
    return rows

def _parse_map_user(payload: dict) -> List[dict]:
    rows=[]
    for nm, vals in (payload.get("data", {}).get("hoverData") or {}).items():
        rows.append({"name":nm,"registeredUsers":_to_float(vals.get("registeredUsers")), "appOpens":_to_float(vals.get("appOpens"))})
    return rows

def _parse_top_txn_ins(payload: dict) -> List[dict]:
    rows=[]
    data = payload.get("data", {}) or {}
    for level in ("states","districts","pincodes"):
        for item in (data.get(level) or []):
            m = item.get("metric") or {}
            if m.get("type") == "TOTAL":
                rows.append({"level":level[:-1],"name":item.get("entityName"),
                             "count":_to_float(m.get("count")), "amount":_to_float(m.get("amount"))})
    return rows

def _parse_top_user(payload: dict) -> List[dict]:
    rows=[]
    data = payload.get("data", {}) or {}
    def add(level):
        for it in (data.get(level) or []):
            rows.append({"level":level[:-1],"name":it.get("name"),
                         "registeredUsers":_to_float(it.get("registeredUsers"))})
    add("states"); add("districts"); add("pincodes")
    return rows

# ---------- NORMALIZER ----------
def _normalize_file(p: Path) -> List[dict]:
    meta = _path_meta(p)
    if not meta: return []
    payload = _read_json(p)
    if not isinstance(payload, dict): return []
    for k in list(payload.keys()):
        if k in IGNORE_META: payload.pop(k, None)

    sec, typ = meta["section"], meta["dtype"]
    rows=[]
    try:
        if sec == "aggregated":
            if typ == "transaction": rows = _parse_agg_txn(payload)
            elif typ == "insurance": rows = _parse_agg_ins(payload)
            elif typ == "user":      rows = _parse_agg_user(payload)
        elif sec == "map":
            if typ in ("transaction","insurance"): rows = _parse_map_txn_ins(payload)
            elif typ == "user":                    rows = _parse_map_user(payload)
        elif sec == "top":
            if typ in ("transaction","insurance"): rows = _parse_top_txn_ins(payload)
            elif typ == "user":                    rows = _parse_top_user(payload)
    except Exception:
        rows = []
    for r in rows: r.update(meta)
    return rows

def load_all_rows() -> pd.DataFrame:
    files = sorted(DATA_ROOT.rglob("*.json"))
    out=[]
    for p in files:
        if _q_from_name(p) is None or _year_from_parent(p) is None: 
            continue
        out.extend(_normalize_file(p))
    df = pd.DataFrame(out)
    if not df.empty:
        df["period"] = pd.PeriodIndex(df["year"].astype("Int64").astype(str) + "Q" + df["quarter"].astype("Int64").astype(str), freq="Q")
        df["geo"] = df["geo_level"].str.cat(df["geo_name"], sep=":")
        df["section_type"] = df["section"].str.cat(df["dtype"], sep="/")
        if "name" in df.columns: df["name_slug"] = df["name"].apply(_slug)
        if "geo_name" in df.columns: df["geo_slug"] = df["geo_name"].apply(_slug)
        if "level" not in df.columns: df["level"] = pd.NA
    return df

# ---------- STATE DISCOVERY (by folder names) ----------
def list_states_fs(section: str, dtype: str) -> List[str]:
    """
    List states by folder name under:
      data/{section}/{dtype}/country/india/state/<state>/
    """
    root = DATA_ROOT / section / dtype / "country" / "india" / "state"
    if not root.exists(): return []
    return sorted([p.name for p in root.iterdir() if p.is_dir()])

def discover_states(sections: List[str], types: List[str]) -> List[str]:
    states=set()
    for s in sections:
        for t in types:
            states.update(list_states_fs(s,t))
    # also include states we may only see inside map/top JSONs (optional)
    df = load_all_rows()
    if not df.empty:
        states.update(df[(df.section=="map") & (df.dtype.isin(types))]["name"].dropna().unique().tolist())
        if "level" in df.columns:
            states.update(df[(df.section=="top") & (df.dtype.isin(types)) & (df.level=="state")]["name"].dropna().unique().tolist())
    return sorted(states)

# ---------- EFFECTIVE GEOGRAPHY LABELS ----------
def _apply_effective_geo(df: pd.DataFrame, geo_mode: str, state_slugs: set[str]) -> pd.DataFrame:
    d = df.copy()
    for col in ("region_level","region_name","region_slug","source_tag"):
        if col not in d.columns: d[col] = pd.NA

    if geo_mode == "country":
        # aggregated country rows
        m = (d.section=="aggregated") & (d.geo_level=="country")
        d.loc[m, ["region_level","region_name","region_slug","source_tag"]] = ["country","india","india","agg_country_path"]
        # map: 'name' is a state (label as state)
        m = (d.section=="map") & d["name"].notna()
        d.loc[m, ["region_level","region_name","region_slug","source_tag"]] = ["state", d.loc[m,"name"], d.loc[m,"name_slug"], "map_state_name"]
        # top: state level rows
        m = (d.section=="top") & (d.level=="state") & d["name"].notna()
        d.loc[m, ["region_level","region_name","region_slug","source_tag"]] = ["state", d.loc[m,"name"], d.loc[m,"name_slug"], "top_state_level"]
        return d

    # --- state mode ---
    # aggregated: state path
    m = (d.section=="aggregated") & (d.geo_level=="state") & (d.geo_slug.isin(state_slugs))
    d.loc[m, ["region_level","region_name","region_slug","source_tag"]] = ["state", d.loc[m,"geo_name"], d.loc[m,"geo_slug"], "agg_state_path"]
    # map: name is state
    m = (d.section=="map") & (d.name_slug.isin(state_slugs))
    d.loc[m, ["region_level","region_name","region_slug","source_tag"]] = ["state", d.loc[m,"name"], d.loc[m,"name_slug"], "map_state_name"]
    # top: state rows
    m = (d.section=="top") & (d.level=="state") & (d.name_slug.isin(state_slugs))
    d.loc[m, ["region_level","region_name","region_slug","source_tag"]] = ["state", d.loc[m,"name"], d.loc[m,"name_slug"], "top_state_level"]

    d = d[d["region_level"].notna()].copy()
    return d

# ---------- PROGRAMMATIC QUERY ----------
def query_data(
    sections: List[str],
    types: List[str],
    geo_mode: str,        # "country" or "state"
    geos: List[str],      # ["india"] or list of state folder names
    years: List[int],
    quarters: List[int],
) -> pd.DataFrame:
    base = load_all_rows()
    if base.empty: return base

    base = base[
        base.section.isin(sections)
        & base.dtype.isin(types)
        & base.year.isin(years)
        & base.quarter.isin(quarters)
    ].copy()

    state_slugs = {_slug(s) for s in geos} if geo_mode == "state" else {"india"}
    out = _apply_effective_geo(base, geo_mode, state_slugs)

    sort_cols = [c for c in ["region_level","region_name","section","dtype","year","quarter"] if c in out.columns]
    return out.sort_values(sort_cols, kind="mergesort").reset_index(drop=True)

# ---------- CLI HELPERS ----------
def _pick(prompt: str, options: List[str]) -> str:
    print(f"\n{prompt}")
    for i,o in enumerate(options,1): print(f"{i}. {o}")
    while True:
        s = input("Choose number: ").strip()
        try:
            k=int(s);  assert 1<=k<=len(options);  return options[k-1]
        except: print("Invalid. Try again.")

def _pick_multi(prompt: str, options: List[str], allow_all=True) -> List[str]:
    print(f"\n{prompt} (comma-separated indexes{' or all' if allow_all else ''})")
    for i,o in enumerate(options,1): print(f"{i}. {o}")
    while True:
        s = input("Choose: ").strip().lower()
        if allow_all and s in ("all","*"): return options
        try:
            idxs=[int(x) for x in s.replace(" ","").split(",") if x]
            picked=[options[i-1] for i in idxs if 1<=i<=len(options)]
            if picked:
                seen=set(); out=[]
                for x in picked:
                    if x not in seen: out.append(x); seen.add(x)
                return out
        except: pass
        print("Invalid. Try again.")

def _parse_years(inp: str, available: Iterable[int]) -> List[int]:
    avail=sorted(set(int(x) for x in available))
    t=inp.strip().lower()
    if t in ("all","*"): return avail
    out=set()
    for part in t.replace(" ","").split(","):
        if "-" in part:
            a,b=part.split("-",1)
            try:
                a,b=int(a),int(b)
                for y in avail:
                    if a<=y<=b: out.add(y)
            except: pass
        else:
            try:
                y=int(part)
                if y in avail: out.add(y)
            except: pass
    return sorted(out) or avail

def _parse_quarters(inp: str) -> List[int]:
    t=inp.strip().lower()
    if t in ("all","*"): return [1,2,3,4]
    qs=set()
    for part in t.replace(" ","").split(","):
        try:
            q=int(part)
            if q in (1,2,3,4): qs.add(q)
        except: pass
    return sorted(qs) or [1,2,3,4]

# ---------- INTERACTIVE CLI (multi-select; states listed by folder name) ----------
def interactive_query_cli(save_csv: bool=False, csv_name: str="selection.csv") -> pd.DataFrame:
    sections = _pick_multi("Sections", SECTION_CHOICES)
    types    = _pick_multi("Types", TYPE_CHOICES)
    geo_mode = _pick("Geo mode", ["country","state"])

    if geo_mode == "state":
        state_options = discover_states(sections, types)  # folder names where available
        if not state_options:
            raise SystemExit("No state folders discovered. Make sure your selection includes 'aggregated' types that have state data.")
        states = _pick_multi("States (by folder name)", state_options)
        geos = states
    else:
        geos = ["india"]

    # show available years given chosen dimensions
    df_tmp = query_data(sections, types, geo_mode, geos, years=list(range(1900,3000)), quarters=[1,2,3,4])
    yrs_avail = sorted(df_tmp["year"].dropna().astype(int).unique().tolist())
    print(f"\nAvailable years: {yrs_avail or '(none)'}")
    years = _parse_years(input("Pick years (e.g. 'all', '2020-2022', '2019,2021'): "), yrs_avail or [])
    quarters = _parse_quarters(input("Pick quarters (e.g. 'all' or '1,3,4'): "))

    out = query_data(sections, types, geo_mode, geos, years, quarters)
    print(f"\nRows selected: {len(out):,}")
    if save_csv and not out.empty:
        out.to_csv(csv_name, index=False); print(f"Saved -> {csv_name}")
    return out

# (no auto-run)


In [2]:
df_view = interactive_query_cli(save_csv=True, csv_name="my_selection.csv")
df_view.head()



Sections (comma-separated indexes or all)
1. aggregated
2. map
3. top

Types (comma-separated indexes or all)
1. transaction
2. user
3. insurance

Geo mode
1. country
2. state

States (by folder name) (comma-separated indexes or all)
1. adilabad district
2. agar malwa district
3. agra district
4. ahmadabad district
5. ahmedabad district
6. ahmednagar district
7. aizawl district
8. ajmer district
9. akola district
10. alappuzha district
11. aligarh district
12. alipurduar district
13. alirajpur district
14. alluri sitharama raju district
15. almora district
16. alwar district
17. ambala district
18. ambedkar nagar district
19. amethi district
20. amravati district
21. amreli district
22. amritsar district
23. amroha district
24. anakapalli district
25. anand district
26. anantapur district
27. ananthapuramu district
28. anantnag district
29. andaman & nicobar islands
30. andaman-&-nicobar-islands
31. andhra pradesh
32. andhra-pradesh
33. angul district
34. anjaw district
35. annamayya 

ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (4,) + inhomogeneous part.

In [7]:
from pathlib import Path

def print_tree_dirs(root: Path, prefix: str = ""):
    """Recursively print only directories (skip .json files)."""
    items = sorted([p for p in root.iterdir() if p.is_dir()], key=lambda x: x.name.lower())
    for i, path in enumerate(items):
        connector = "└── " if i == len(items) - 1 else "├── "
        print(prefix + connector + path.name)
        extension = "    " if i == len(items) - 1 else "│   "
        print_tree_dirs(path, prefix + extension)

# run it
root = Path("data")  # adjust if needed
print(root.resolve())
print_tree_dirs(root)


/Users/shamiya/pulse/data
├── aggregated
│   ├── insurance
│   │   └── country
│   │       └── india
│   │           ├── 2020
│   │           ├── 2021
│   │           ├── 2022
│   │           ├── 2023
│   │           ├── 2024
│   │           └── state
│   │               ├── andaman-&-nicobar-islands
│   │               │   ├── 2020
│   │               │   ├── 2021
│   │               │   ├── 2022
│   │               │   ├── 2023
│   │               │   └── 2024
│   │               ├── andhra-pradesh
│   │               │   ├── 2020
│   │               │   ├── 2021
│   │               │   ├── 2022
│   │               │   ├── 2023
│   │               │   └── 2024
│   │               ├── arunachal-pradesh
│   │               │   ├── 2020
│   │               │   ├── 2021
│   │               │   ├── 2022
│   │               │   ├── 2023
│   │               │   └── 2024
│   │               ├── assam
│   │               │   ├── 2020
│   │               │   ├── 2021
│   │               │   ├─

In [None]:
df_view.to_csv("selection.csv", index=False)
print("Saved selection.csv")


In [None]:
df_prog = query_data(
    sections=["aggregated","map","top"],
    types=["transaction","user"],
    geo_mode="state",
    geos=["karnataka","maharashtra","tamil-nadu"],  # multiple states OK
    years=[2020,2021,2022,2023,2024],
    quarters=[1,2,3,4],
)
df_prog.head()

In [10]:
df_prog = query_data(
    sections=["aggregated","map","top"],
    types=["transaction","user"],
    geo_mode="state",
    geos=["karnataka","maharashtra","tamil-nadu"],   # multiple states allowed
    years=[2020,2021,2022,2023,2024],
    quarters=[1,2,3,4],
)
df_prog.head()


Unnamed: 0,metric,category,count,amount,section,dtype,map_kind,geo_level,geo_name,year,...,registeredUsers,appOpens,brand,brand_count,brand_pct,name,level,period,geo,section_type
1,,,271033890.0,353402100000.0,map,transaction,hover,country,india,2020,...,,,,,,karnataka,,2020Q1,country:india,map/transaction
0,,,238944081.0,319995200000.0,map,transaction,hover,country,india,2020,...,,,,,,maharashtra,,2020Q1,country:india,map/transaction
3,,,213221712.0,321104900000.0,map,transaction,hover,country,india,2020,...,,,,,,karnataka,,2020Q2,country:india,map/transaction
2,,,184790761.0,296359500000.0,map,transaction,hover,country,india,2020,...,,,,,,maharashtra,,2020Q2,country:india,map/transaction
5,,,300360531.0,470740500000.0,map,transaction,hover,country,india,2020,...,,,,,,karnataka,,2020Q3,country:india,map/transaction


In [11]:
df_prog = query_data(
    sections=["aggregated","map","top"],
    types=["user"],
    geo_mode="state",
    geos=["karnataka","maharashtra","tamil-nadu"],   # multiple states allowed
    years=[2020,2021,2022,2023,2024],
    quarters=[1,2,3,4],
)
df_prog.head()


Unnamed: 0,metric,category,count,amount,section,dtype,map_kind,geo_level,geo_name,year,...,registeredUsers,appOpens,brand,brand_count,brand_pct,name,level,period,geo,section_type
1,,,,,map,user,hover,country,india,2020,...,17016980.0,626488036.0,,,,karnataka,,2020Q1,country:india,map/user
0,,,,,map,user,hover,country,india,2020,...,26168390.0,615146568.0,,,,maharashtra,,2020Q1,country:india,map/user
3,,,,,map,user,hover,country,india,2020,...,18655366.0,490510075.0,,,,karnataka,,2020Q2,country:india,map/user
2,,,,,map,user,hover,country,india,2020,...,28765086.0,506354812.0,,,,maharashtra,,2020Q2,country:india,map/user
5,,,,,map,user,hover,country,india,2020,...,20487550.0,693683212.0,,,,karnataka,,2020Q3,country:india,map/user
