diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 06615a7e1..b639133d7 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -66,6 +66,16 @@ jobs: ports: - 9202:9202 + redis: + image: redis:7-alpine + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + strategy: matrix: python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13"] @@ -126,3 +136,6 @@ jobs: DATABASE_REFRESH: true ES_VERIFY_CERTS: false BACKEND: ${{ matrix.backend == 'elasticsearch7' && 'elasticsearch' || matrix.backend == 'elasticsearch8' && 'elasticsearch' || 'opensearch' }} + REDIS_ENABLE: true + REDIS_HOST: localhost + REDIS_PORT: 6379 diff --git a/CHANGELOG.md b/CHANGELOG.md index f0f028bc9..ef1d9c50d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - CloudFerro logo to sponsors and supporters list [#485](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/485) - Latest news section to README [#485](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/485) +- Added Redis caching configuration for navigation pagination support, enabling proper `prev` and `next` links in paginated responses. [#488](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/488) ### Changed diff --git a/Makefile b/Makefile index 204b31a10..bde31064b 100644 --- a/Makefile +++ b/Makefile @@ -82,7 +82,7 @@ test-datetime-filtering-os: docker compose down .PHONY: test -test: test-elasticsearch test-datetime-filtering-es test-opensearch test-datetime-filtering-os +test: test-elasticsearch test-datetime-filtering-es test-opensearch test-datetime-filtering-os test-redis-es test-redis-os .PHONY: run-database-es run-database-es: @@ -117,4 +117,16 @@ docs-image: .PHONY: docs docs: docs-image docker compose -f compose.docs.yml \ - run docs \ No newline at end of file + run docs + +.PHONY: test-redis-es +test-redis-es: + docker compose -f compose-redis.yml up -d + -$(run_es) /bin/bash -c 'export REDIS_ENABLE=true REDIS_HOST=redis REDIS_PORT=6379 && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest redis/ -v' + docker compose -f compose-redis.yml down + +.PHONY: test-redis-os +test-redis-os: + docker compose -f compose-redis.yml up -d + -$(run_os) /bin/bash -c 'export REDIS_ENABLE=true REDIS_HOST=redis REDIS_PORT=6379 && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest redis/ -v' + docker compose -f compose-redis.yml down diff --git a/README.md b/README.md index b87bd21be..670b8788b 100644 --- a/README.md +++ b/README.md @@ -317,6 +317,31 @@ You can customize additional settings in your `.env` file: > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, `ES_VERIFY_CERTS` and `ES_TIMEOUT` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. +**Redis for Navigation:** +These Redis configuration variables enable proper navigation functionality in STAC FastAPI. The Redis cache stores navigation state for paginated results, allowing the system to maintain previous page links using tokens. The configuration supports either Redis Sentinel or standalone Redis setups. + +| Variable | Description | Default | Required | +|-------------------------------|----------------------------------------------------------------------------------------------|--------------------------|---------------------------------------------------------------------------------------------| +| **General** | | | | +| `REDIS_ENABLE` | Enables or disables Redis caching for navigation. Set to `true` to use Redis, or `false` to disable. | `false` | **Required** (determines whether Redis is used at all) | +| **Redis Sentinel** | | | | +| `REDIS_SENTINEL_HOSTS` | Comma-separated list of Redis Sentinel hostnames/IP addresses. | `""` | Conditional (required if using Sentinel) | +| `REDIS_SENTINEL_PORTS` | Comma-separated list of Redis Sentinel ports (must match order). | `"26379"` | Conditional (required if using Sentinel) | +| `REDIS_SENTINEL_MASTER_NAME` | Name of the Redis master node in Sentinel configuration. | `"master"` | Conditional (required if using Sentinel) | +| **Redis** | | | | +| `REDIS_HOST` | Redis server hostname or IP address for Redis configuration. | `""` | Conditional (required for standalone Redis) | +| `REDIS_PORT` | Redis server port for Redis configuration. | `6379` | Conditional (required for standalone Redis) | +| **Both** | | | | +| `REDIS_DB` | Redis database number to use for caching. | `0` (Sentinel) / `0` (Standalone) | Optional | +| `REDIS_MAX_CONNECTIONS` | Maximum number of connections in the Redis connection pool. | `10` | Optional | +| `REDIS_RETRY_TIMEOUT` | Enable retry on timeout for Redis operations. | `true` | Optional | +| `REDIS_DECODE_RESPONSES` | Automatically decode Redis responses to strings. | `true` | Optional | +| `REDIS_CLIENT_NAME` | Client name identifier for Redis connections. | `"stac-fastapi-app"` | Optional | +| `REDIS_HEALTH_CHECK_INTERVAL` | Interval in seconds for Redis health checks. | `30` | Optional | + +> [!NOTE] +> Use either the Sentinel configuration (`REDIS_SENTINEL_HOSTS`, `REDIS_SENTINEL_PORTS`, `REDIS_SENTINEL_MASTER_NAME`) OR the Redis configuration (`REDIS_HOST`, `REDIS_PORT`), but not both. + ## Datetime-Based Index Management ### Overview diff --git a/compose-redis.yml b/compose-redis.yml new file mode 100644 index 000000000..b572e7318 --- /dev/null +++ b/compose-redis.yml @@ -0,0 +1,13 @@ +version: '3.8' + +services: + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_test_data:/data + command: redis-server --appendonly yes + +volumes: + redis_test_data: diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 000000000..1ddba6383 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,3 @@ +[mypy] +[mypy-redis.*] +ignore_missing_imports = True diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 143b4d5ac..dcd1716cb 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -24,9 +24,10 @@ from stac_fastapi.core.base_settings import ApiBaseSettings from stac_fastapi.core.datetime_utils import format_datetime_range from stac_fastapi.core.models.links import PagingLinks +from stac_fastapi.core.redis_utils import _handle_pagination_via_redis from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.core.session import Session -from stac_fastapi.core.utilities import filter_fields +from stac_fastapi.core.utilities import filter_fields, get_bool_env from stac_fastapi.extensions.core.transaction import AsyncBaseTransactionsClient from stac_fastapi.extensions.core.transaction.request import ( PartialCollection, @@ -328,6 +329,8 @@ async def all_collections( if parsed_sort: sort = parsed_sort + redis_enable = get_bool_env("REDIS_ENABLE", default=False) + # Convert q to a list if it's a string q_list = None if q is not None: @@ -426,6 +429,8 @@ async def all_collections( }, ] + _handle_pagination_via_redis(redis_enable, next_token, token, request, links) + if next_token: next_link = PagingLinks(next=next_token, request=request).link_next() links.append(next_link) @@ -744,6 +749,7 @@ async def post_search( HTTPException: If there is an error with the cql2_json filter. """ base_url = str(request.base_url) + redis_enable = get_bool_env("REDIS_ENABLE", default=False) search = self.database.make_search() @@ -850,6 +856,29 @@ async def post_search( ] links = await PagingLinks(request=request, next=next_token).get_links() + collection_links = [] + if search_request.collections: + for collection_id in search_request.collections: + collection_links.extend( + [ + { + "rel": "collection", + "type": "application/json", + "href": urljoin(base_url, f"collections/{collection_id}"), + }, + { + "rel": "parent", + "type": "application/json", + "href": urljoin(base_url, f"collections/{collection_id}"), + }, + ] + ) + links.extend(collection_links) + + _handle_pagination_via_redis( + redis_enable, next_token, token_param, request, links + ) + return stac_types.ItemCollection( type="FeatureCollection", features=items, diff --git a/stac_fastapi/core/stac_fastapi/core/redis_utils.py b/stac_fastapi/core/stac_fastapi/core/redis_utils.py new file mode 100644 index 000000000..d98a3e0c0 --- /dev/null +++ b/stac_fastapi/core/stac_fastapi/core/redis_utils.py @@ -0,0 +1,163 @@ +"""Utilities for connecting to and managing Redis connections.""" + +import logging +from typing import Dict, List, Optional + +from fastapi import Request +from pydantic_settings import BaseSettings +from redis import asyncio as aioredis +from redis.asyncio.sentinel import Sentinel + +logger = logging.getLogger(__name__) + +redis_pool: Optional[aioredis.Redis] = None + + +class RedisSentinelSettings(BaseSettings): + """Configuration for connecting to Redis Sentinel.""" + + REDIS_SENTINEL_HOSTS: str = "" + REDIS_SENTINEL_PORTS: str = "26379" + REDIS_SENTINEL_MASTER_NAME: str = "master" + REDIS_DB: int = 15 + + REDIS_MAX_CONNECTIONS: int = 10 + REDIS_RETRY_TIMEOUT: bool = True + REDIS_DECODE_RESPONSES: bool = True + REDIS_CLIENT_NAME: str = "stac-fastapi-app" + REDIS_HEALTH_CHECK_INTERVAL: int = 30 + + +class RedisSettings(BaseSettings): + """Configuration for connecting Redis Sentinel.""" + + REDIS_HOST: str = "" + REDIS_PORT: int = 6379 + REDIS_DB: int = 0 + + REDIS_MAX_CONNECTIONS: int = 10 + REDIS_RETRY_TIMEOUT: bool = True + REDIS_DECODE_RESPONSES: bool = True + REDIS_CLIENT_NAME: str = "stac-fastapi-app" + REDIS_HEALTH_CHECK_INTERVAL: int = 30 + + +# Select the Redis or Redis Sentinel configuration +redis_settings: BaseSettings = RedisSettings() + + +async def connect_redis(settings: Optional[RedisSettings] = None) -> aioredis.Redis: + """Return a Redis connection.""" + global redis_pool + settings = settings or redis_settings + + if not settings.REDIS_HOST or not settings.REDIS_PORT: + return None + + if redis_pool is None: + pool = aioredis.ConnectionPool( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB, + max_connections=settings.REDIS_MAX_CONNECTIONS, + decode_responses=settings.REDIS_DECODE_RESPONSES, + retry_on_timeout=settings.REDIS_RETRY_TIMEOUT, + health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL, + ) + redis_pool = aioredis.Redis( + connection_pool=pool, client_name=settings.REDIS_CLIENT_NAME + ) + return redis_pool + + +async def connect_redis_sentinel( + settings: Optional[RedisSentinelSettings] = None, +) -> Optional[aioredis.Redis]: + """Return a Redis Sentinel connection.""" + global redis_pool + + settings = settings or redis_settings + + if ( + not settings.REDIS_SENTINEL_HOSTS + or not settings.REDIS_SENTINEL_PORTS + or not settings.REDIS_SENTINEL_MASTER_NAME + ): + return None + + hosts = [h.strip() for h in settings.REDIS_SENTINEL_HOSTS.split(",") if h.strip()] + ports = [ + int(p.strip()) for p in settings.REDIS_SENTINEL_PORTS.split(",") if p.strip() + ] + + if redis_pool is None: + try: + sentinel = Sentinel( + [(h, p) for h, p in zip(hosts, ports)], + decode_responses=settings.REDIS_DECODE_RESPONSES, + ) + master = sentinel.master_for( + service_name=settings.REDIS_SENTINEL_MASTER_NAME, + db=settings.REDIS_DB, + decode_responses=settings.REDIS_DECODE_RESPONSES, + retry_on_timeout=settings.REDIS_RETRY_TIMEOUT, + client_name=settings.REDIS_CLIENT_NAME, + max_connections=settings.REDIS_MAX_CONNECTIONS, + health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL, + ) + redis_pool = master + + except Exception: + return None + + return redis_pool + + +async def save_self_link( + redis: aioredis.Redis, token: Optional[str], self_href: str +) -> None: + """Save the self link for the current token with 30 min TTL.""" + if token: + await redis.setex(f"nav:self:{token}", 1800, self_href) + + +async def get_prev_link(redis: aioredis.Redis, token: Optional[str]) -> Optional[str]: + """Get the previous page link for the current token (if exists).""" + if not token: + return None + return await redis.get(f"nav:self:{token}") + + +async def _handle_pagination_via_redis( + redis_enable: bool, + next_token: Optional[str], + token_param: Optional[str], + request: Request, + links: List[Dict], +) -> None: + """Handle Redis connection and operations for pagination links.""" + if not redis_enable: + return + + redis = None + try: + redis = await connect_redis() + logger.info("Redis connection established successfully") + + if redis and next_token: + self_link = str(request.url) + await save_self_link(redis, next_token, self_link) + + prev_link = await get_prev_link(redis, token_param) + if prev_link: + links.insert( + 0, + { + "rel": "prev", + "type": "application/json", + "method": "GET", + "href": prev_link, + }, + ) + except Exception as e: + logger.warning(f"Redis connection failed, continuing without Redis: {e}") diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 1751df78f..612c75875 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -21,6 +21,7 @@ "pre-commit~=3.0.0", "ciso8601~=2.3.0", "httpx>=0.24.0,<0.28.0", + "redis==6.4.0", ], "docs": ["mkdocs~=1.4.0", "mkdocs-material~=9.0.0", "pdocs~=1.2.0"], "server": ["uvicorn[standard]~=0.23.0"], diff --git a/stac_fastapi/opensearch/setup.py b/stac_fastapi/opensearch/setup.py index d7727267f..f7d175752 100644 --- a/stac_fastapi/opensearch/setup.py +++ b/stac_fastapi/opensearch/setup.py @@ -22,6 +22,7 @@ "pre-commit~=3.0.0", "ciso8601~=2.3.0", "httpx>=0.24.0,<0.28.0", + "redis==6.4.0", ], "docs": ["mkdocs~=1.4.0", "mkdocs-material~=9.0.0", "pdocs~=1.2.0"], "server": ["uvicorn[standard]~=0.23.0"], diff --git a/stac_fastapi/tests/redis/__init__.py b/stac_fastapi/tests/redis/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/stac_fastapi/tests/redis/test_redis_pagination.py b/stac_fastapi/tests/redis/test_redis_pagination.py new file mode 100644 index 000000000..0fa30c468 --- /dev/null +++ b/stac_fastapi/tests/redis/test_redis_pagination.py @@ -0,0 +1,80 @@ +import uuid + +import pytest + +from ..conftest import create_collection, create_item + + +@pytest.mark.asyncio +async def test_search_pagination_uses_redis_cache( + app_client, txn_client, load_test_data +): + """Test Redis caching and navigation for the /search endpoint.""" + + collection = load_test_data("test_collection.json") + collection_id = f"test-pagination-collection-{uuid.uuid4()}" + collection["id"] = collection_id + await create_collection(txn_client, collection) + + for i in range(5): + item = load_test_data("test_item.json") + item["id"] = f"test-pagination-item-{uuid.uuid4()}" + item["collection"] = collection_id + await create_item(txn_client, item) + + resp = await app_client.post( + "/search", json={"collections": [collection_id], "limit": 1} + ) + resp_json = resp.json() + + next_link = next( + (link for link in resp_json["links"] if link["rel"] == "next"), None + ) + next_token = next_link["body"]["token"] + + # Expect the previous link on the second page to be retrieved from Redis cache + resp2 = await app_client.post( + "/search", + json={"collections": [collection_id], "limit": 1, "token": next_token}, + ) + resp2_json = resp2.json() + + prev_link = next( + (link for link in resp2_json["links"] if link["rel"] == "prev"), None + ) + assert prev_link is not None + + +@pytest.mark.asyncio +async def test_collections_pagination_uses_redis_cache( + app_client, txn_client, load_test_data +): + """Test Redis caching and navigation for the /collection endpoint.""" + + collection_data = load_test_data("test_collection.json") + for i in range(5): + collection = collection_data.copy() + collection["id"] = f"test-collection-pagination-{uuid.uuid4()}" + collection["title"] = f"Test Collection Pagination {i}" + await create_collection(txn_client, collection) + + resp = await app_client.get("/collections", params={"limit": 1}) + assert resp.status_code == 200 + resp1_json = resp.json() + + next_link = next( + (link for link in resp1_json["links"] if link["rel"] == "next"), None + ) + next_token = next_link["href"].split("token=")[1] + + # Expect the previous link on the second page to be retrieved from Redis cache + resp2 = await app_client.get( + "/collections", params={"limit": 1, "token": next_token} + ) + assert resp2.status_code == 200 + resp2_json = resp2.json() + + prev_link = next( + (link for link in resp2_json["links"] if link["rel"] == "prev"), None + ) + assert prev_link is not None diff --git a/stac_fastapi/tests/redis/test_redis_utils.py b/stac_fastapi/tests/redis/test_redis_utils.py new file mode 100644 index 000000000..d4e80ce5c --- /dev/null +++ b/stac_fastapi/tests/redis/test_redis_utils.py @@ -0,0 +1,44 @@ +import pytest + +from stac_fastapi.core.redis_utils import connect_redis, get_prev_link, save_self_link + + +@pytest.mark.asyncio +async def test_redis_connection(): + """Test Redis connection.""" + redis = await connect_redis() + assert redis is not None + + # Test set/get + await redis.set("string_key", "string_value") + string_value = await redis.get("string_key") + assert string_value == "string_value" + + # Test key retrieval operation + exists = await redis.exists("string_key") + assert exists == 1 + + # Test key deletion + await redis.delete("string_key") + deleted_value = await redis.get("string_key") + assert deleted_value is None + + +@pytest.mark.asyncio +async def test_redis_utils_functions(): + redis = await connect_redis() + assert redis is not None + + token = "test_token_123" + self_link = "http://mywebsite.com/search?token=test_token_123" + + await save_self_link(redis, token, self_link) + retrieved_link = await get_prev_link(redis, token) + assert retrieved_link == self_link + + await save_self_link(redis, None, "should_not_save") + null_result = await get_prev_link(redis, None) + assert null_result is None + + non_existent = await get_prev_link(redis, "non_existent_token") + assert non_existent is None