In [1]:
import json
import geojson
from typing import Any, List, Union
from geojson.geometry import Geometry  # Ensures valid GeoJSON types


class CQLExpression:
    """Represents a CQL2 expression"""
    
    def __init__(self, expression: dict):
        self.expression = expression

    def __and__(self, other: "CQLExpression") -> "CQLExpression":
        return CQLExpression({"op": "and", "args": [self.expression, other.expression]})

    def __or__(self, other: "CQLExpression") -> "CQLExpression":
        return CQLExpression({"op": "or", "args": [self.expression, other.expression]})

    def __invert__(self) -> "CQLExpression":
        return CQLExpression({"op": "not", "args": [self.expression]})

    def to_dict(self) -> dict:
        return self.expression

    def to_json(self) -> str:
        return json.dumps(self.expression, indent=2)


class Field:
    """Base class for typed fields"""
    
    def __init__(self, name: str):
        self.name = name

    def _wrap(self, op: str, value: Any) -> CQLExpression:
        return CQLExpression({"op": op, "args": [{"property": self.name}, value]})


class StringField(Field):
    def __eq__(self, other: str) -> CQLExpression:
        return self._wrap("eq", other)

    def __ne__(self, other: str) -> CQLExpression:
        return self._wrap("neq", other)

    def like(self, pattern: str) -> CQLExpression:
        return self._wrap("like", pattern)

    def is_in(self, values: List[str]) -> CQLExpression:
        return self._wrap("in", values)


class NumberField(Field):
    def __eq__(self, other: Union[int, float]) -> CQLExpression:
        return self._wrap("eq", other)

    def __ne__(self, other: Union[int, float]) -> CQLExpression:
        return self._wrap("neq", other)

    def __gt__(self, other: Union[int, float]) -> CQLExpression:
        return self._wrap("gt", other)

    def __lt__(self, other: Union[int, float]) -> CQLExpression:
        return self._wrap("lt", other)

    def __ge__(self, other: Union[int, float]) -> CQLExpression:
        return self._wrap("gte", other)

    def __le__(self, other: Union[int, float]) -> CQLExpression:
        return self._wrap("lte", other)


class BooleanField(Field):
    def __eq__(self, other: bool) -> CQLExpression:
        return self._wrap("eq", other)

    def __ne__(self, other: bool) -> CQLExpression:
        return self._wrap("neq", other)


class DatetimeField(Field):
    def __eq__(self, other: str) -> CQLExpression:
        return self._wrap("eq", {"timestamp": other})

    def __ne__(self, other: str) -> CQLExpression:
        return self._wrap("neq", {"timestamp": other})

    def __gt__(self, other: str) -> CQLExpression:
        return self._wrap("gt", {"timestamp": other})

    def __lt__(self, other: str) -> CQLExpression:
        return self._wrap("lt", {"timestamp": other})

    def __ge__(self, other: str) -> CQLExpression:
        return self._wrap("gte", {"timestamp": other})

    def __le__(self, other: str) -> CQLExpression:
        return self._wrap("lte", {"timestamp": other})

    def between(self, start: str, end: str) -> CQLExpression:
        return CQLExpression({
            "op": "interval",
            "args": [{"property": self.name}, {"timestamp": start}, {"timestamp": end}]
        })


class GeoField(Field):
    """Geospatial field supporting spatial queries"""

    def _geo_wrap(self, op: str, geometry: Geometry) -> CQLExpression:
        """Helper method to format geo queries"""
        if not isinstance(geometry, Geometry):
            raise TypeError(f"Expected a geojson.Geometry object, got {type(geometry)}")
        return CQLExpression({"op": op, "args": [{"property": self.name}, {"geometry": geometry}]})

    def intersects(self, geometry: Geometry) -> CQLExpression:
        """Returns features that intersect the given geometry"""
        return self._geo_wrap("intersects", geometry)

    def contains(self, geometry: Geometry) -> CQLExpression:
        """Returns features that completely contain the given geometry"""
        return self._geo_wrap("contains", geometry)

    def within(self, geometry: Geometry) -> CQLExpression:
        """Returns features that are within the given geometry"""
        return self._geo_wrap("within", geometry)

    def disjoint(self, geometry: Geometry) -> CQLExpression:
        """Returns features that do not overlap with the given geometry"""
        return self._geo_wrap("disjoint", geometry)

In [2]:
# Example Usage
timestamp = DatetimeField("datetime")
cloud_cover = NumberField("cloud_cover")
platform = StringField("platform")
active = BooleanField("active")
geometry = GeoField("geometry")

# Example GeoJSON Polygon using geojson library
bbox_polygon = geojson.Polygon([[
    (10.0, 10.0),
    (20.0, 10.0),
    (20.0, 20.0),
    (10.0, 20.0),
    (10.0, 10.0)
]])

# Example CQL2 filter:
filter_expr = (
    timestamp.between("2022-01-01T00:00:00Z", "2022-12-31T23:59:59Z")
    & (cloud_cover < 20)
    & platform.is_in(["Sentinel-2", "Landsat-8"])
    & (active == True)
    & geometry.intersects(bbox_polygon)
)

print(filter_expr.to_json())

{
  "op": "and",
  "args": [
    {
      "op": "and",
      "args": [
        {
          "op": "and",
          "args": [
            {
              "op": "and",
              "args": [
                {
                  "op": "interval",
                  "args": [
                    {
                      "property": "datetime"
                    },
                    {
                      "timestamp": "2022-01-01T00:00:00Z"
                    },
                    {
                      "timestamp": "2022-12-31T23:59:59Z"
                    }
                  ]
                },
                {
                  "op": "lt",
                  "args": [
                    {
                      "property": "cloud_cover"
                    },
                    20
                  ]
                }
              ]
            },
            {
              "op": "in",
              "args": [
                {
                  "property": "platform"
            