Skip to content
Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added

- Added configurable landing page ID `STAC_FASTAPI_LANDING_PAGE_ID` [#352](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/352)
- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Changed

- Refactored CRUD methods in `TransactionsClient` to use the `validate_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Fixed

- Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

## [v4.1.0] - 2025-05-09

### Added
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ You can customize additional settings in your `.env` file:
| `RELOAD` | Enable auto-reload for development. | `true` | Optional |
| `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional |
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional |
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional |

> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.
Expand Down
28 changes: 15 additions & 13 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,11 @@ async def create_item(
for feature in features
]
attempted = len(processed_items)

success, errors = await self.database.bulk_async(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
collection_id=collection_id,
processed_items=processed_items,
**kwargs,
)
if errors:
logger.error(
Expand All @@ -729,10 +730,7 @@ async def create_item(

# Handle single item
await self.database.create_item(
item_dict,
refresh=kwargs.get("refresh", False),
base_url=base_url,
exist_ok=False,
item_dict, base_url=base_url, exist_ok=False, **kwargs
)
return ItemSerializer.db_to_stac(item_dict, base_url)

Expand All @@ -757,11 +755,12 @@ async def update_item(
"""
item = item.model_dump(mode="json")
base_url = str(kwargs["request"].base_url)

now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
item["properties"]["updated"] = now

await self.database.create_item(
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
item, base_url=base_url, exist_ok=True, **kwargs
)

return ItemSerializer.db_to_stac(item, base_url)
Expand All @@ -777,7 +776,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None:
Returns:
None: Returns 204 No Content on successful deletion
"""
await self.database.delete_item(item_id=item_id, collection_id=collection_id)
await self.database.delete_item(
item_id=item_id, collection_id=collection_id, **kwargs
)
return None

@overrides
Expand All @@ -798,8 +799,9 @@ async def create_collection(
"""
collection = collection.model_dump(mode="json")
request = kwargs["request"]

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.create_collection(collection=collection)
await self.database.create_collection(collection=collection, **kwargs)
return CollectionSerializer.db_to_stac(
collection,
request,
Expand Down Expand Up @@ -835,7 +837,7 @@ async def update_collection(

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.update_collection(
collection_id=collection_id, collection=collection
collection_id=collection_id, collection=collection, **kwargs
)

return CollectionSerializer.db_to_stac(
Expand All @@ -860,7 +862,7 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None:
Raises:
NotFoundError: If the collection doesn't exist
"""
await self.database.delete_collection(collection_id=collection_id)
await self.database.delete_collection(collection_id=collection_id, **kwargs)
return None


Expand Down Expand Up @@ -937,7 +939,7 @@ def bulk_item_insert(
success, errors = self.database.bulk_sync(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
**kwargs,
)
if errors:
logger.error(f"Bulk sync operation encountered errors: {errors}")
Expand Down
65 changes: 60 additions & 5 deletions stac_fastapi/core/stac_fastapi/core/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,75 @@
MAX_LIMIT = 10000


def get_bool_env(name: str, default: bool = False) -> bool:
def validate_refresh(value: Union[str, bool]) -> str:
"""
Validate the `refresh` parameter value.

Args:
value (Union[str, bool]): The `refresh` parameter value, which can be a string or a boolean.

Returns:
str: The validated value of the `refresh` parameter, which can be "true", "false", or "wait_for".
"""
logger = logging.getLogger(__name__)

# Handle boolean-like values using get_bool_env
if isinstance(value, bool) or value in {
"true",
"false",
"1",
"0",
"yes",
"no",
"y",
"n",
}:
is_true = get_bool_env("DATABASE_REFRESH", default=value)
return "true" if is_true else "false"

# Normalize to lowercase for case-insensitivity
value = value.lower()

# Handle "wait_for" explicitly
if value == "wait_for":
return "wait_for"

# Log a warning for invalid values and default to "false"
logger.warning(
f"Invalid value for `refresh`: '{value}'. Expected 'true', 'false', or 'wait_for'. Defaulting to 'false'."
)
return "false"


def get_bool_env(name: str, default: Union[bool, str] = False) -> bool:
"""
Retrieve a boolean value from an environment variable.

Args:
name (str): The name of the environment variable.
default (bool, optional): The default value to use if the variable is not set or unrecognized. Defaults to False.
default (Union[bool, str], optional): The default value to use if the variable is not set or unrecognized. Defaults to False.

Returns:
bool: The boolean value parsed from the environment variable.
"""
value = os.getenv(name, str(default).lower())
true_values = ("true", "1", "yes", "y")
false_values = ("false", "0", "no", "n")

# Normalize the default value
if isinstance(default, bool):
default_str = "true" if default else "false"
elif isinstance(default, str):
default_str = default.lower()
else:
logger = logging.getLogger(__name__)
logger.warning(
f"The `default` parameter must be a boolean or string, got {type(default).__name__}. "
f"Falling back to `False`."
)
default_str = "false"

# Retrieve and normalize the environment variable value
value = os.getenv(name, default_str)
if value.lower() in true_values:
return True
elif value.lower() in false_values:
Expand All @@ -34,9 +89,9 @@ def get_bool_env(name: str, default: bool = False) -> bool:
logger = logging.getLogger(__name__)
logger.warning(
f"Environment variable '{name}' has unrecognized value '{value}'. "
f"Expected one of {true_values + false_values}. Using default: {default}"
f"Expected one of {true_values + false_values}. Using default: {default_str}"
)
return default
return default_str in true_values


def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]:
Expand Down
26 changes: 24 additions & 2 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import logging
import os
import ssl
from typing import Any, Dict, Set
from typing import Any, Dict, Set, Union

import certifi
from elasticsearch._async.client import AsyncElasticsearch

from elasticsearch import Elasticsearch # type: ignore[attr-defined]
from stac_fastapi.core.base_settings import ApiBaseSettings
from stac_fastapi.core.utilities import get_bool_env
from stac_fastapi.core.utilities import get_bool_env, validate_refresh
from stac_fastapi.types.config import ApiSettings


Expand Down Expand Up @@ -88,6 +88,17 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false")
return validate_refresh(value)

@property
def create_client(self):
"""Create es client."""
Expand All @@ -109,6 +120,17 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false")
return validate_refresh(value)

@property
def create_client(self):
"""Create async elasticsearch client."""
Expand Down
Loading