# Chain of Thought


## Purpose
This notebook will look at the feasibility of using an LLM to run GIS analysis. Currently, GeoAgent can only set filters based on a pre-selected set of fields (determined by the vector store)

```mermaid
flowchart LR
    User --> |message|FieldDefStore
    FieldDefStore --> |set of fields|LLM
    LLM -->|field filters| Map
```

### Problem
This is a very limited approach and has proved to be nothing more than a novelty. To really up the performance of this Agent, it needs to be allowed to *think* and plan out a strategy for analysis, then the python code executes it.

*Soooo* if someone asks "Show me commercial properties in brooklyn close to 120kV transmission lines"

The current approach will do something like

1. Run that quesiton through a store
1. Retrieve field defs like 'ZoneDist1', 'BoroCode', and 'VOLT_CLASS'
1. LLM will generate filters like "ZoneDist1 contains 'C'", "BoroCode=3", and "VOLT_CLASS=120"
1. The map will highlight these as two seperate map layers, one for the 'pluto' db and one for the 'transmission' db

### Solution
What we need is to **first** have the LLM come up with a plan, just like an analyst would do

Plan:
  1. Find all the commercial properties in brooklyn
  1. Find all the 120kV transmission lines
  1. Intersect those two set, such that they are within 10 meters
  1. Create new temp table
  1. Use temp table to display layer


### Proposal
Build a "chain-of-thought" type system, using strucutred outputs, to allow the LLM to preplan its analysis


## First Step: Test out some PostGIS Queries

First things first, lets test out some queries to create tables in PostGIS

In [1]:
from dotenv import load_dotenv
load_dotenv()

from geo_assistant.config import Configuration
from geo_assistant.agent._sql_exec import execute_template_sql
from sqlalchemy import create_engine, Engine, text

ENGINE = create_engine(Configuration.db_connection_url)

def drop_table(engine: Engine, table_name: str, schema: str = "public") -> None:
    """
    Drop a table if it exists.

    :param engine: SQLAlchemy Engine connected to your PostGIS database.
    :param table_name: Name of the table to drop (no schema).
    :param schema: Schema where the table lives (defaults to 'public').
    """
    qualified = f'"{schema}"."{table_name}"'
    drop_sql = text(f'DROP TABLE IF EXISTS {qualified} CASCADE;')
    # Use a transaction to ensure it commits
    with engine.begin() as conn:
        conn.execute(drop_sql)

### Aggregate Table

Creating a aggregate table works! Under a minute is fine too!!

In [None]:
execute_template_sql(
    engine=ENGINE,
    template_name="aggregate",
    output_table='test_aggregate',
    source_table="pluto",
    geometry_column="geometry",
    group_by=["BoroCode"]
)

### Spatial Merge

Spatial merges work as well!!

In [None]:
execute_template_sql(
    engine=ENGINE,
    template_name="merge",
    output_table="parcels_near_transmissions",
    left_table="pluto",
    right_table="transmission",
    geometry_column_left="geometry",
    geometry_column_right="geometry",
    spatial_predicate="dwithin",
    distance=10,
    keep_geometry="left",
)


### Buffer Table
And buffer table!!!

In [None]:
execute_template_sql(
    engine=ENGINE,
    template_name="buffer",
    source_table="parcels_near_transmissions",
    output_table="trans_parcel_halo",
    buffer_distance= 10,
    buffer_unit= "meters",
    srid_input= 4326,
    srid_tile= 3857,
    layer_name= "roads_halo",
    geometry_column="geometry"
)


### Cleanup

The experiment to dynamically create tables works great!! Just going to do some clean up

In [None]:
drop_table(ENGINE, "test_aggregate")
drop_table(ENGINE, "parcels_near_transmissions")
drop_table(ENGINE, "trans_parcel_halo")

## Chain-Of-Thought

Since it is proven that basic GIS operations can be parameterized, the next step is to test the LLM's ability to generate an analysis workflow

One of the trickest steps to do is going to be limiting the columns by an Enum (use the vector store to not overwhelm with 100s of different fields)

This will probably have to rely on some sort of dynamic Enum generation, then backfil the Enum to marked fields on the pydantic model

In [None]:
from typing import Type, Union
from pydantic import BaseModel, create_model

import json
from enum import Enum
from typing import Type, Self

def make_enum(*values: str) -> Type[Enum]:
    """
    Dynamically constructs an Enum subclass.

    - `name` is the Enum class name.
    - `values` are the allowed string values.
    
    Each member will be named as the upper-cased version of the value,
    and its `.value` will be the original string.
    """
    members = { val.upper(): val for val in values }
    return Enum("Fields", members)


DynamicField = Type[str]

class SQLStep(BaseModel):
    id: int

    class Config:
        validate_assignment = True

    @classmethod
    def with_fields(cls, fields_enum: type[Enum]) -> Type[Self]:
        """
        Return a new subclass of this model where every
        field annotated as DynamicField is replaced by Literal[options].
        """

        # find all the DynamicField markers and replace them
        dynamic_fields = {
            field_name: fields_enum
            for field_name, field_info in cls.model_fields.items()
            if field_info.annotation == DynamicField
        }
        dynamic_list_fields = {
            field_name: list[fields_enum]
            for field_name, field_info in cls.model_fields.items()
            if field_info.annotation == list[DynamicField]
        }
        
        # give it a distinct name so Pydantic can differentiate
        new_name = f"{cls.__name__}Dynamic"
        return create_model(
            new_name,
            __base__=cls,
            **(dynamic_fields | dynamic_list_fields)
        )


In [3]:
class AggregateStep(SQLStep):
    group_by: DynamicField
    columns: list[DynamicField]

fields = make_enum("dog", "cat")
AggregateStep.with_fields(fields).model_json_schema()

{'$defs': {'Fields': {'enum': ['dog', 'cat'],
   'title': 'Fields',
   'type': 'string'}},
 'properties': {'id': {'title': 'Id', 'type': 'integer'},
  'group_by': {'$ref': '#/$defs/Fields'},
  'columns': {'items': {'$ref': '#/$defs/Fields'},
   'title': 'Columns',
   'type': 'array'}},
 'required': ['id', 'group_by', 'columns'],
 'title': 'AggregateStepDynamic',
 'type': 'object'}

This seems to be a way to *mark* fields then backfill them with a dynamically generated enum. Now need to see if that enum is shared across all these types of classes, if part of a larger one (Needed to reduce context size)

In [4]:
class AggregateStep(SQLStep):
    group_by: DynamicField


class MergeStep(SQLStep):
    left_col: DynamicField


class GISAnalysis(BaseModel):
    steps: list[Union[AggregateStep, MergeStep]]

    @classmethod
    def backfill_fields(cls, *options: str) -> Type["GISAnalysis"]:
        """
        Returns a new GISAnalysis subclass where each of the step models
        (AggregateStep, MergeStep) has had its DynamicField replaced.
        """
        fields_enum = make_enum(*options)
        # generate dynamic versions of each SQLStep subclass
        dynamic_steps = [
            step_model.with_fields(fields_enum=fields_enum)
            for step_model in SQLStep.__subclasses__()
        ]
        # build Union[DynAgg, DynMerge]
        StepUnion = Union[tuple(dynamic_steps)]  # type: ignore[misc]

        # override only the 'steps' field
        return create_model(
            f"{cls.__name__}Dynamic",
            __base__=cls,
            steps=(list[StepUnion], ...)
        )


In [5]:
print(json.dumps(GISAnalysis.backfill_fields("dog", "cat").model_json_schema(), indent=2))

{
  "$defs": {
    "Fields": {
      "enum": [
        "dog",
        "cat"
      ],
      "title": "Fields",
      "type": "string"
    },
    "MergeStepDynamic": {
      "properties": {
        "id": {
          "title": "Id",
          "type": "integer"
        },
        "left_col": {
          "$ref": "#/$defs/Fields"
        }
      },
      "required": [
        "id",
        "left_col"
      ],
      "title": "MergeStepDynamic",
      "type": "object"
    },
    "__main____AggregateStepDynamic__1": {
      "properties": {
        "id": {
          "title": "Id",
          "type": "integer"
        },
        "group_by": {
          "$ref": "#/$defs/Fields"
        },
        "columns": {
          "items": {
            "$ref": "#/$defs/Fields"
          },
          "title": "Columns",
          "type": "array"
        }
      },
      "required": [
        "id",
        "group_by",
        "columns"
      ],
      "title": "AggregateStepDynamic",
      "type": "object"
    },

This seemed to do it!! Gonna codify these blocks, then test to see if the LLM can properly generate an analysis

In [6]:
import json
from geo_assistant.agent._steps import _GISAnalysis, _AggregateStep, _BufferStep, _MergeStep, _FilterStep, _AddMapLayer
print(json.dumps(_GISAnalysis.model_json_schema(), indent=2))

{
  "properties": {},
  "title": "_GISAnalysis",
  "type": "object"
}


In [7]:
from geo_assistant.doc_stores import FieldDefinitionStore
from typing import get_args

field_store = FieldDefinitionStore(version="1.0.8")

user_question = "Can you show me commercial plots that are within a transmission line?"
fields = await field_store.query(user_question, k=10)
DynGISModel = _GISAnalysis.build_model(
    steps=[_AggregateStep, _MergeStep, _BufferStep, _FilterStep, _AddMapLayer],
    fields=[field['name'] for field in fields]
)
print(json.dumps(DynGISModel.model_json_schema(), indent=2))

{
  "$defs": {
    "AddMapLayer": {
      "properties": {
        "name": {
          "description": "A descriptive name for the step",
          "title": "Name",
          "type": "string"
        },
        "reasoning": {
          "description": "Description of what the step does, and why it is needed",
          "title": "Reasoning",
          "type": "string"
        },
        "source_table": {
          "title": "Source Table",
          "type": "string"
        },
        "layer_id": {
          "description": "The id of the new map layer",
          "title": "Layer Id",
          "type": "string"
        },
        "color": {
          "description": "Hex value of the color of the geometries",
          "title": "Color",
          "type": "string"
        },
        "filters": {
          "items": {
            "anyOf": [
              {
                "$ref": "#/$defs/ValueFilter"
              },
              {
                "$ref": "#/$defs/ListFilter"
              },


### OpenAI Tests

Ok, lets try sending this schema to openai with a nice system message and context and see what it does!!

In [8]:
import openai
import pathlib
from jinja2 import Template

system_message_template = Template(source=pathlib.Path("./geo_assistant/agent/system_message.j2").read_text())

In [9]:
from geo_assistant.doc_stores import SupplementalInfoStore, FieldDefinitionStore
from geo_assistant.config import Configuration
user_question = "transmission lines near commercial"
field_store = FieldDefinitionStore(version="1.0.8")
fields = await field_store.query(user_question)
info_store = SupplementalInfoStore(version="1.0.8")
context = await info_store.query(user_question, k=5)

In [10]:
system_message = system_message_template.render(
    field_definitions=fields,
    context_info=context
)
print(system_message)


You are an expert GIS analyst and spatial database architect with deep experience in PostGIS, pg-tileserv, and vector-tile workflows.

### Instructions
1. Think through your analysis step by step before producing any output.
2. Produce only valid JSON that conforms exactly to the `GISAnalysis` schema below.
3. Do **not** include any explanatory text outside the JSON object.

*NOTE* All tables are already spatially indexed.

### Available Data

-- Field Definitions --
You have access to the following table fields:
**transmission.OWNER** (string)
Owner of the transmission line.

**transmission.VOLT_CLASS** (string)
Voltage of the transmission line grouped in industry standard classifications.

**pluto.Overlay1** (string)
The commercial overlay assigned to the tax lo t. A commercial overlay is a C1 or C2
  zoning district mapped within residential z oning districts to serve local retail needs
  (grocery stores, dry cleaners, restaurants, for example).

  If more than one commercial overl

In [11]:
res = openai.Client(api_key=Configuration.openai_key).responses.parse(
    input=[
        {'role': 'system', 'content': system_message},
        {'role': 'user', 'content': user_question}
    ],
    model="o4-mini",
    reasoning={
        "effort":"high",
    },
    text_format=DynGISModel
)

In [12]:
print(res.output_parsed.model_dump_json(indent=2))

{
  "steps": [
    {
      "id_": "5cfa1682-52e5-43d2-bc71-2b73d12a8d68",
      "name": "Filter commercial overlay parcels",
      "reasoning": "Filter pluto table to only parcels that have a primary commercial overlay",
      "select": [
        "Overlay1"
      ],
      "output_table": "pluto_commercial",
      "source_table": "pluto",
      "filters": [
        {
          "column": "Overlay1",
          "operator": "IS NOT NULL"
        },
        {
          "column": "Overlay1",
          "operator": "!=",
          "value": ""
        }
      ]
    },
    {
      "id_": "b179ebca-251c-4920-ae72-9094b0e7f7ee",
      "name": "Find transmission lines near commercial parcels",
      "reasoning": "Identify transmission lines that are within 100 units of commercial parcels",
      "select": [
        "OWNER",
        "VOLT_CLASS",
        "VOLTAGE",
        "Overlay1"
      ],
      "output_table": "transmission_near_commercial",
      "left_table": "transmission",
      "right_table"