# 📓 Catalist → MAP Schema Migration Notebook
A repeatable pipeline for converting the **MAP Types Airtable export (CSV)** into the **canonical MAP-schema JSON**.

**What this notebook does**

1. Load the CSV you exported from Airtable
2. Normalise booleans, trim strings, and fix casing
3. Build a **`schema` object** (name, label, description)
4. Convert every row into a **typed descriptor** inside `types[]`
5. Emit a single JSON file whose shape matches the new loader spec
6. (Optional) validate against the JSON-Schema draft included in design docs
---
# 1. Imports & Config


In [3]:
# 1 — Imports & Config

import pandas as pd
import json, itertools
from datetime import datetime
from pathlib import Path

# Paths (edit as needed)
CSV_PATH  = Path("data/MAP Types-Grid view.csv")
OUT_DIR   = Path(".")
BASE_NAME = "catalist_schema"               # will become catalist_schema_<timestamp>.json

# Which TypeKinds from the CSV should become descriptors
TYPEKINDS_TO_EXPORT = ["HolonType", "PropertyType", "ValueType",
                       "EnumType",  "EnumVariantType",
                       "RelationshipType"]          # ← no CollectionType!

# Name this new schema (and a short human label/description)
SCHEMA_INFO = {
    "type_name"   : "CatalistSchema",
    "label"       : "Catalist 3.0 Schema",
    "description" : "Types auto-generated from Catalist Airtable export"
}

# ------------------------------------------------------------------
# Helper: normalise Airtable checkbox / bool columns
def bool_norm(val) -> bool:
    return bool(val) if isinstance(val, bool) else str(val).strip().lower() == "checked"


---
# 2. Load CSV + peek

In [4]:
df = pd.read_csv(CSV_PATH)
print(f"Loaded {len(df)} rows, {len(df.columns)} columns")
df.head(3)

Loaded 184 rows, 45 columns


Unnamed: 0,Type Name,TypeKind,Notes,Assignee,New For Catalist,Status,Descriptor Name,Description,Label (Human Readable),Is Dependent,...,Min Value,Max Value,Min Length,Max Length,Catalist Field,Variants,Import to MAP?,Relationship From,Relationship To,Is Definitional
0,Activity,HolonType,Similar to a MAP dance?,,Yes,,ActivityDescriptor,An action or event performed by a group or per...,Activity,,...,,,,,,,,,,
1,Archetype,HolonType,Sub-Type of Personality Trait? (per comment in...,,Yes,,ArchetypeDescriptor,A universally recognized model or pattern of b...,Archetype,,...,,,,,,,,,,
2,Attachment,HolonType,Probably a URL address and Title. Add a MIMETy...,,Yes,,AttachmentDescriptor,External resource with associated URL and title.,Attachment,,...,,,,,,,,,,


---
# 3. $ref helper

In [5]:
def ref(type_name:str, schema:str=None, space:str=None):
    """
    Build a structured $ref object.
    Only include 'schema' or 'space' keys when supplied.
    """
    obj = {"type_name": type_name}
    if schema: obj["schema"] = schema
    if space:  obj["space"]  = space
    return {"$ref": obj}

---
# 4. Row → Descriptor Conversion

In [6]:
def common_header(row, descriptor_name):
    """Return the pieces common to every TypeKind."""
    return {
        "descriptor_name" : descriptor_name,
        "label"           : row.get("Label (Human Readable)", "") or "",
        "description"     : row.get("Description", "") or "",
        "is_dependent"    : bool_norm(row.get("Is Dependent", False)),
        "is_value_type"   : bool_norm(row.get("Is ValueType", False)),
        "described_by"    : ref(row["TypeKind"]),      # all core meta-types are addressable
        "is_subtype_of"   : None                       # TODO: populate when CSV adds this
    }

# ----------------------------------------------------------------
def row_to_holontype(row):
    type_name = row["Type Name"]
    spec = common_header(row, f"{type_name}Descriptor")
    # properties / key_properties
    props = row.get("MAP PROPERTIES PropertyTypes", "")
    spec["properties"] = [] if pd.isna(props) else [p.strip() for p in str(props).split(",") if p.strip()]
    keyp = row.get("MAP KEY_PROPERTIES PropertyTypes", "")
    spec["key_properties"] = [] if pd.isna(keyp) else [k.strip() for k in str(keyp).split(",") if k.strip()]
    # inline spec fields
    spec["type_name"] = type_name
    # relationships defined inline (optional)
    # TODO: when Airtable adds a “source_for” column, parse it here
    return {"type_name": type_name,
            "type_kind": "Holon",
            "described_by": ref("HolonType"),
            "spec": spec}

# ----------------------------------------------------------------
def row_to_propertytype(row):
    type_name = row["Type Name"]
    spec = common_header(row, f"{type_name}_descriptor")
    spec["property_name"] = type_name
    spec["value_type"]    = ref(row.get("ValueType (VALUE_TYPE_FOR)", "").strip())
    return {"type_name": type_name,
            "type_kind": "Property",
            "described_by": ref("PropertyType"),
            "spec": spec}

# ----------------------------------------------------------------
def row_to_relationshiptype(row):
    """
    Flattened RelationshipType that now embeds former CollectionType fields.
    Expands 'Relationship From' × 'Relationship To' into multiple descriptors.
    """
    rel_name     = row["Type Name"]          # screaming-snake relationship name
    source_owns  = bool_norm(row.get("Source Owns Relationship", False))
    deletion_sem = row.get("Deletion Semantic", None)
    deletion_sem = None if (pd.isna(deletion_sem) or str(deletion_sem).strip() == "") else deletion_sem

    load_links   = bool_norm(row.get("Load Links Immediate", False))
    load_holons  = bool_norm(row.get("Load Holons Immediate", False))
    has_inverse  = row.get("Has Inverse", "") or None

    target_min   = int(row.get("Target Min Cardinality") or 0)
    target_max   = int(row.get("Target Max Cardinality") or 0)
    target_sem   = row.get("Target Semantic", "Set")    # Set | List | Bag …

    froms = [f.strip() for f in str(row.get("Relationship From", "")).split(",") if f.strip()]
    tos   = [t.strip() for t in str(row.get("Relationship To", "")).split(",") if t.strip()]

    out   = []
    for src, tgt in itertools.product(froms, tos):
        type_name = f"{src}-{rel_name}->{tgt}"
        spec      = common_header(row, f"{type_name}Descriptor")
        spec.update({
            "relationship_name"        : rel_name,
            "source_owns_relationship" : source_owns,
            "deletion_semantic"        : deletion_sem,
            "load_links_immediate"     : load_links,
            "load_holons_immediate"    : load_holons,
            "has_inverse"              : ref(has_inverse) if has_inverse else None,

            # ⬇️ flattened collection fields
            "target_holon_type"        : ref(tgt, schema=SCHEMA_INFO["type_name"]),
            "target_semantic"          : target_sem,
            "target_min_cardinality"   : target_min,
            "target_max_cardinality"   : target_max
        })
        out.append({
            "type_name"    : type_name,
            "type_kind"    : "Relationship",
            "described_by" : ref("RelationshipType"),
            "spec"         : spec
        })
    return out


---
# 5. Convert every CSV row to descriptor dict(s)

In [7]:
descriptors = []

for _, row in df.iterrows():
    kind = row["TypeKind"]
    if kind not in TYPEKINDS_TO_EXPORT:
        continue

    if kind == "HolonType":
        descriptors.append(row_to_holontype(row))

    elif kind == "PropertyType":
        descriptors.append(row_to_propertytype(row))

    elif kind == "RelationshipType":
        descriptors.extend(row_to_relationshiptype(row))

    # TODO: EnumType, EnumVariantType … when those appear in CSV

---
# 6. Build the final JSON payload

In [8]:
schema_block = {
    "type_name"   : SCHEMA_INFO["type_name"],
    "described_by": ref("SchemaHolonType"),   # core schema descriptor
    "properties"  : {
        "name"       : SCHEMA_INFO["label"],
        "description": SCHEMA_INFO["description"]
    }
}

payload = {
    "schema": schema_block,
    "types" : descriptors
}
print(f"Schema object ready with {len(descriptors)} types")

Schema object ready with 219 types


---
# 7. Write to timestamped JSON file

In [9]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
out_path  = OUT_DIR / f"{BASE_NAME}_{timestamp}.json"

with open(out_path, "w") as fh:
    json.dump(payload, fh, indent=2)

print("✅ Wrote", out_path)

✅ Wrote catalist_schema_20250609_142147.json


---
## 8. Validate against JSON Schema (optional)
When you have a JSON Schema for the canonical MAP import JSON, you can validate the output against it.

In [2]:
# Uncomment the code below to use this when you have a JSON Schema

# import jsonschema, pathlib, json
# schema_doc = json.load(open("map_schema_import.schema.json"))
# jsonschema.validate(payload, schema_doc)
# print("Payload validated against canonical JSON-Schema")