In [None]:
import json
from typing import Dict, Set

import pandas as pd
import numpy as np
import yaml
from IPython.display import display
from tqdm.notebook import tqdm


from pydantic import ValidationError

from pydantic import BaseModel

pd.set_option("display.max_rows", 50)
pd.set_option("display.max_columns", 50)

psg_directory = "../resources/"
psg_data_file = "psgc_2026-01-13.csv"

In [None]:
df = pd.read_csv(psg_directory + psg_data_file)
display(df.info())
display(df)

In [None]:
df["psgc_id"] = df["psgc_id"].astype(str).str.zfill(10)
df = df.map(lambda x: x.strip() if isinstance(x, str) else x)
df["population"] = (
    df["population"].str.replace(",", "").replace("-", np.nan).astype("Int64")
)

In [None]:
geographic_level_map = {
    "Reg": "region",
    "City": "city",
    "Mun": "municipality",
    "Prov": "province",
    "SubMun": "submunicipality",
    "Bgy": "barangay",
}
df["geographic_level"] = df["geographic_level"].replace(geographic_level_map)

df["barangay_code"] = df["psgc_id"].str[-3:]
df["municipal_or_city_code"] = df["psgc_id"].str[-5:-3]
df["province_or_huc_code"] = df["psgc_id"].str[-8:-5]
df["region_code"] = df["psgc_id"].str[-10:-8]

df["barangay_mapper"] = df["psgc_id"].str[-10:]
df["municipal_or_city_mapper"] = df["psgc_id"].str[-10:-3]
df["province_or_huc_mapper"] = df["psgc_id"].str[-10:-5]
df["region_mapper"] = df["psgc_id"].str[-10:-8]

df.sample(10)

In [None]:
regions_filter = (
    (df["province_or_huc_code"] == "000")
    & (df["municipal_or_city_code"] == "00")
    & (df["barangay_code"] == "000")
)
regions_mapper = (
    df.loc[regions_filter, ["region_mapper", "name"]]
    .sort_values("region_mapper")
    .set_index("region_mapper", drop=True)
    .to_dict()["name"]
)
regions_mapper

In [None]:
province_or_huc_filter = (
    ~(df["province_or_huc_code"] == "000")
    & (df["municipal_or_city_code"] == "00")
    & (df["barangay_code"] == "000")
)

province_or_huc_mapper = (
    df.loc[province_or_huc_filter, ["province_or_huc_mapper", "name"]]
    .sort_values("province_or_huc_mapper")
    .set_index("province_or_huc_mapper")
    .to_dict()["name"]
)
province_or_huc_mapper

In [None]:
municipal_or_city_filter = (
    ~(df["province_or_huc_code"] == "000")
    & ~(df["municipal_or_city_code"] == "00")
    & (df["barangay_code"] == "000")
)

municipal_or_city_mapper = (
    df.loc[municipal_or_city_filter, ["municipal_or_city_mapper", "name"]]
    .sort_values("municipal_or_city_mapper")
    .set_index("municipal_or_city_mapper")
    .to_dict()["name"]
)
municipal_or_city_mapper

In [None]:
df["region"] = df["region_mapper"].map(regions_mapper)
df["province_or_huc"] = df["province_or_huc_mapper"].map(province_or_huc_mapper)
df["municipality_or_city"] = df["municipal_or_city_mapper"].map(
    municipal_or_city_mapper
)

In [None]:
barangay_df = df[df["geographic_level"] == "barangay"].reset_index(drop=True)

In [None]:
# building dictionary
empty_municipality = barangay_df["municipality_or_city"].isna()
empty_province_or_huc = barangay_df["province_or_huc"].isna()

In [None]:
mdf = barangay_df[~empty_municipality & ~empty_province_or_huc][
    [
        "region",
        "region_mapper",
        "province_or_huc",
        "province_or_huc_mapper",
        "municipality_or_city",
        "municipal_or_city_mapper",
        "name",
    ]
].sort_values(["region", "province_or_huc", "municipality_or_city"])

empty_municipality_df = barangay_df[empty_municipality & ~empty_province_or_huc][
    [
        "region",
        "region_mapper",
        "province_or_huc",
        "province_or_huc_mapper",
        "municipality_or_city",
        "municipal_or_city_mapper",
        "name",
    ]
].sort_values(["region", "province_or_huc", "municipality_or_city"])

empty_province_df = barangay_df[~empty_municipality & empty_province_or_huc][
    [
        "region",
        "region_mapper",
        "province_or_huc",
        "province_or_huc_mapper",
        "municipality_or_city",
        "municipal_or_city_mapper",
        "name",
    ]
].sort_values(["region", "province_or_huc", "municipality_or_city"])

In [None]:
root_dict: Dict[str, Dict[str, Set[str] | Dict[str, Set]]] = {}
for idx, (i, j, k, l) in mdf[
    ["region", "province_or_huc", "municipality_or_city", "name"]
].iterrows():
    if i not in root_dict.keys():
        root_dict[i] = {}
    if j not in root_dict[i].keys():
        root_dict[i][j] = {}
    if k not in root_dict[i][j].keys():
        root_dict[i][j][k] = set()
    root_dict[i][j][k].add(l)

# handling empty municipality
for idx, (i, j, k) in empty_municipality_df[
    ["region", "province_or_huc", "name"]
].iterrows():
    if i not in root_dict.keys():
        root_dict[i] = {}
    if j not in root_dict[i].keys():
        root_dict[i][j] = set()
    root_dict[i][j].add(k)

# handling empty prov
for idx, (i, j, k) in empty_province_df[
    ["region", "municipality_or_city", "name"]
].iterrows():
    if i not in root_dict.keys():
        root_dict[i] = {}
    if j not in root_dict[i].keys():
        root_dict[i][j] = set()
    root_dict[i][j].add(k)

In [None]:
from typing import Literal, Optional, List

from pydantic import Field


class Location(BaseModel):
    name: str
    type: Literal[
        "country",
        "region",
        "province",
        "city",
        "municipality",
        "barangay",
        "special_geographic_area",
        "submunicipality",
    ]
    psgc_id: str | Literal["n/a"]
    parent_psgc_id: str | Literal["n/a"]
    nicknames: Optional[List[str]] = None
    components: List["Location"] = Field(default_factory=list)


class FlatLocation(BaseModel):
    name: str
    type: Literal[
        "country",
        "region",
        "province",
        "city",
        "municipality",
        "barangay",
        "special_geographic_area",
        "submunicipality",
    ]
    psgc_id: str | Literal["n/a"]
    parent_psgc_id: str | Literal["n/a"]
    nicknames: Optional[List[str]] = None
    # components: List["Location"] = Field(default_factory=list)

In [None]:
df[(df["province_or_huc"].notna()) & (~df["municipality_or_city"].notna())].sample(10)

In [None]:
flat_dict: List[FlatLocation] = []
root = Location(
    name="Philippines", psgc_id="0000000000", type="country", parent_psgc_id="n/a"
)
for region in root_dict:
    row = df[df["name"] == region].iloc[0]
    new_location = Location(
        name=row["name"],
        type="region",
        psgc_id=row["psgc_id"],
        parent_psgc_id="0000000000",
    )
    flat_location = FlatLocation(
        name=row["name"],
        type="region",
        psgc_id=row["psgc_id"],
        parent_psgc_id="0000000000",
    )
    root.components.append(new_location)
    flat_dict.append(flat_location)


# RESOLVE PROVINCES & HUC UNDER REGIONS
for region in root.components:
    provinces_or_hucs_in_region = df[
        (df["region"] == region.name)
        & ~(df["province_or_huc_code"] == "000")  # this means that this is a province
        & (df["municipal_or_city_code"] == "00")
        & (df["barangay_code"] == "000")
    ]
    for idx, prov_or_huc in provinces_or_hucs_in_region.iterrows():
        if prov_or_huc["psgc_id"] == "1999900000":
            prov_or_huc["geographic_level"] = "special_geographic_area"
        if prov_or_huc["psgc_id"] == "0990100000":
            prov_or_huc["geographic_level"] = "city"
        try:
            newloc = Location(
                name=prov_or_huc["name"],
                type=prov_or_huc["geographic_level"],
                psgc_id=prov_or_huc["psgc_id"],
                parent_psgc_id=region.psgc_id,
            )
            newflatloc = FlatLocation(
                name=prov_or_huc["name"],
                type=prov_or_huc["geographic_level"],
                psgc_id=prov_or_huc["psgc_id"],
                parent_psgc_id=region.psgc_id,
            )
        except ValidationError:
            print("############## ERROR")
            print(prov_or_huc)
        region.components.append(newloc)
        flat_dict.append(newflatloc)

# RESOLVE CITIES & MUNICIPALITIES DIRECTLY UNDER REGIONS
for region in root.components:
    municipality_or_city_in_region = df[
        (df["region"] == region.name)
        & ~(df["province_or_huc_code"] == "000")  # this means that this is a province
        & ~(df["municipal_or_city_code"] == "00")
        & (df["barangay_code"] == "000")
        & ~(df["province_or_huc"].notna())
    ]
    for idx, mun_or_city in municipality_or_city_in_region.iterrows():
        try:
            newloc = Location(
                name=mun_or_city["name"],
                type=mun_or_city["geographic_level"],
                psgc_id=mun_or_city["psgc_id"],
                parent_psgc_id=region.psgc_id,
            )
            newflatloc = FlatLocation(
                name=mun_or_city["name"],
                type=mun_or_city["geographic_level"],
                psgc_id=mun_or_city["psgc_id"],
                parent_psgc_id=region.psgc_id,
            )
        except ValidationError:
            print("############## ERROR")
            print(mun_or_city)
        region.components.append(newloc)
        flat_dict.append(newflatloc)

# RESOLVE CITIES & MUNICIPALITIES UNDER PROVINCE & HUCs
for region in root.components:
    for province_or_huc in region.components:
        municipality_or_city_in_province_or_huc = df[
            (df["province_or_huc"] == province_or_huc.name)
            & (df["region"] == region.name)
            & ~(df["province_or_huc_code"] == "000")
            & ~(df["municipal_or_city_code"] == "00")
            & (df["barangay_code"] == "000")
            & (df["province_or_huc"].notna())
            & (df["municipality_or_city"].notna())
        ]
        for idx, mun_or_city in municipality_or_city_in_province_or_huc.iterrows():
            try:
                newloc = Location(
                    name=mun_or_city["name"],
                    type=mun_or_city["geographic_level"],
                    psgc_id=mun_or_city["psgc_id"],
                    parent_psgc_id=province_or_huc.psgc_id,
                )
                newflatloc = FlatLocation(
                    name=mun_or_city["name"],
                    type=mun_or_city["geographic_level"],
                    psgc_id=mun_or_city["psgc_id"],
                    parent_psgc_id=province_or_huc.psgc_id,
                )

            except ValidationError as e:
                print(e)
                print("############## ERROR")
                print(mun_or_city)
            province_or_huc.components.append(newloc)
            flat_dict.append(newflatloc)

# RESOLVE BARANGAY IF ITS UNDER A MUNICIPALITY OR CITY AND UNDER A PROVINCE OR HUC
for region in tqdm(root.components, leave=True, ascii=True):
    for province_or_huc in region.components:
        for municipality_or_city in province_or_huc.components:
            barangay_in_municipality_or_city = df[
                (df["municipality_or_city"] == municipality_or_city.name)
                & (df["region"] == region.name)
                & (df["province_or_huc"] == province_or_huc.name)
                & ~(df["province_or_huc_code"] == "000")
                & ~(df["municipal_or_city_code"] == "00")
                & ~(df["barangay_code"] == "000")
                & (df["province_or_huc"].notna())
                & (df["municipality_or_city"].notna())
            ]
            if province_or_huc.psgc_id == "1630400000":
                display(barangay_in_municipality_or_city)
            for idx, barangay in barangay_in_municipality_or_city.iterrows():
                try:
                    newloc = Location(
                        name=barangay["name"],
                        type=barangay["geographic_level"],
                        psgc_id=barangay["psgc_id"],
                        parent_psgc_id=municipality_or_city.psgc_id,
                    )
                    newflatloc = FlatLocation(
                        name=barangay["name"],
                        type=barangay["geographic_level"],
                        psgc_id=barangay["psgc_id"],
                        parent_psgc_id=municipality_or_city.psgc_id,
                    )
                except ValidationError as e:
                    print(e)
                    print("############## ERROR")
                    print(barangay)
                municipality_or_city.components.append(newloc)
                flat_dict.append(newflatloc)

# RESOLVE BARANGAYS DIRECTLY UNDER PROVINCE OR HUCS
for region in tqdm(root.components, leave=True, ascii=True):
    for province_or_huc in region.components:
        barangay_in_province_or_huc = df[
            (df["province_or_huc"] == province_or_huc.name)
            & (df["region"] == region.name)
            & ~(df["province_or_huc_code"] == "000")
            & (df["municipal_or_city_code"] == "00")
            & ~(df["barangay_code"] == "000")
            & (df["province_or_huc"].notna())
            & ~(df["municipality_or_city"].notna())
        ]
        for idx, barangay in barangay_in_province_or_huc.iterrows():
            try:
                newloc = Location(
                    name=barangay["name"],
                    type=barangay["geographic_level"],
                    psgc_id=barangay["psgc_id"],
                    parent_psgc_id=province_or_huc.psgc_id,
                )
                newflatloc = FlatLocation(
                    name=barangay["name"],
                    type=barangay["geographic_level"],
                    psgc_id=barangay["psgc_id"],
                    parent_psgc_id=province_or_huc.psgc_id,
                )
            except ValidationError as e:
                print(e)
                print("############## ERROR")
                print(province_or_huc)
            province_or_huc.components.append(newloc)
            flat_dict.append(newflatloc)

# RESOLVE BARANGAY UNDER MUNICIPALITY THAT IS UNDER REGIONS DIRECTLY
for region in tqdm(root.components, leave=True, ascii=True):
    for municipality_or_city in region.components:
        barangay_in_municipality_or_city = df[
            (df["municipality_or_city"] == municipality_or_city.name)
            & (df["region"] == region.name)
            & ~(df["province_or_huc_code"] == "000")
            & ~(df["municipal_or_city_code"] == "00")
            & ~(df["barangay_code"] == "000")
            & ~(df["province_or_huc"].notna())
            & (df["municipality_or_city"].notna())
        ]
        for idx, barangay in barangay_in_municipality_or_city.iterrows():
            try:
                newloc = Location(
                    name=barangay["name"],
                    type=barangay["geographic_level"],
                    psgc_id=barangay["psgc_id"],
                    parent_psgc_id=municipality_or_city.psgc_id,
                )
                newflatloc = FlatLocation(
                    name=barangay["name"],
                    type=barangay["geographic_level"],
                    psgc_id=barangay["psgc_id"],
                    parent_psgc_id=municipality_or_city.psgc_id,
                )
            except ValidationError as e:
                print(e)
                print("############## ERROR")
                print(mun_or_city)
            municipality_or_city.components.append(newloc)
            flat_dict.append(newflatloc)

In [None]:
json_dump = json.dumps([item.model_dump() for item in flat_dict], indent=4)
yaml_dump = yaml.safe_dump([item.model_dump() for item in flat_dict], sort_keys=False)

In [None]:
with open("../barangay/data/barangay_flat.json", encoding="utf8", mode="w") as fp:
    fp.write(json_dump)

In [None]:
with open("../barangay/data/barangay_flat.yaml", encoding="utf8", mode="w") as fp:
    fp.write(yaml_dump)