feat(migrate): [4/6] Async executor, planner, and validator#570
feat(migrate): [4/6] Async executor, planner, and validator#570nkanu17 wants to merge 2 commits intofeat/migrate-wizardfrom
Conversation
|
@codex review |
🛡️ Jit Security Scan Results✅ No security findings were detected in this PR
Security scan by Jit
|
| try: | ||
| search_result = await target_index.search(Query("*").paging(0, 1)) | ||
| total_found = search_result.total | ||
| passed = total_found == expected_doc_count |
There was a problem hiding this comment.
Async functional check uses stricter validation than sync
Medium Severity
The async _run_functional_checks uses passed = total_found == expected_doc_count (strict equality), while the sync version uses passed = total_found > 0 for non-zero expected counts (a smoke test). This behavioral inconsistency means the --async flag can cause the functional:wildcard_search query check to fail in scenarios where the sync path would pass, leading to a "One or more query checks failed" validation error added to the report.
There was a problem hiding this comment.
Fixed — async functional check now uses >0 logic matching sync version.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: cc2d1d3742
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| try: | ||
| search_result = await target_index.search(Query("*").paging(0, 1)) | ||
| total_found = search_result.total | ||
| passed = total_found == expected_doc_count |
There was a problem hiding this comment.
Decouple functional check from exact document count
This functional check currently fails whenever search_result.total differs from expected_doc_count, which effectively re-enforces doc-count equality even when plan.validation.require_doc_count_match is disabled. In those cases, migrations can be marked failed via "One or more query checks failed." despite the user explicitly opting out of strict count matching. The functional check should only verify index operability (e.g., query executes and returns sensible results), not duplicate policy-controlled count validation.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — async functional check now matches sync logic.
| from redis.asyncio import Redis | ||
|
|
||
| client = Redis.from_url(redis_url) |
There was a problem hiding this comment.
Close temporary async Redis client after SVS check
When redis_url is provided, _check_svs_vamana_requirements creates a new async Redis client with Redis.from_url(...) but never closes it. Repeated planning calls in a long-lived process can leak connections and eventually hit Redis/client connection limits. Track whether a client was created in this method and await client.aclose() in a finally block (as the sync planner already does).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Already fixed — aclose() in finally block.
There was a problem hiding this comment.
Pull request overview
This PR adds async equivalents of the migration planner/executor/validator to support non-blocking migration workflows, and wires an --async flag into the rvl migrate apply CLI for large/high-latency migrations (including vector re-encoding).
Changes:
- Introduces
AsyncMigrationPlanner,AsyncMigrationExecutor, andAsyncMigrationValidatorplus async shared utilities. - Adds
--asyncoption torvl migrate applyto run migrations viaasyncio. - Adds unit + integration test coverage for the async migration flow.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
redisvl/migration/async_executor.py |
Async apply pipeline (enumerate/rename/quantize/recreate/index/validate) mirroring sync executor |
redisvl/migration/async_planner.py |
Async source snapshot + plan creation, delegates CPU-bound diff/merge to sync planner |
redisvl/migration/async_validation.py |
Async post-migration validation + functional query checks |
redisvl/migration/async_utils.py |
Async helpers (list indexes, wait for indexing, schema snapshot comparison) |
redisvl/migration/__init__.py |
Exposes async migration classes from the package |
redisvl/cli/migrate.py |
Adds --async flag and async apply path via asyncio.run() |
tests/unit/test_async_migration_executor.py |
Unit tests for async executor + quantization/reliability utilities |
tests/unit/test_async_migration_planner.py |
Unit tests for async planner plan generation/diff classification |
tests/integration/test_async_migration_v1.py |
End-to-end async plan/apply/validate integration test |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
redisvl/migration/async_utils.py
Outdated
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | ||
| if progress_callback: | ||
| total_docs = int(latest_info.get("num_docs", 0)) | ||
| pct = float(percent_indexed or 0) |
There was a problem hiding this comment.
async_wait_for_index_ready() can incorrectly wait until timeout when FT.INFO returns indexing but omits percent_indexed. The current logic treats missing percent_indexed as 0.0 even if indexing is falsy; sync wait_for_index_ready() treats that case as ready. Mirror the sync logic so that percent_indexed is None + not indexing is considered ready and progress reporting uses a sensible fallback.
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| pct = float(percent_indexed or 0) | |
| # Mirror sync logic: | |
| # - If percent_indexed is None and not indexing, treat as ready. | |
| # - For progress reporting, fall back to 100% when not indexing, | |
| # or 0% when indexing is still in progress. | |
| if percent_indexed is None: | |
| if not bool(indexing): | |
| ready = True | |
| pct = 1.0 if not bool(indexing) else 0.0 | |
| else: | |
| pct = float(percent_indexed or 0) | |
| ready = pct >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) |
There was a problem hiding this comment.
Fixed — readiness check now handles percent_indexed=None correctly.
| if percent_indexed is not None or indexing is not None: | ||
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | ||
| if progress_callback: | ||
| total_docs = int(latest_info.get("num_docs", 0)) | ||
| pct = float(percent_indexed or 0) | ||
| indexed_docs = int(total_docs * pct) | ||
| progress_callback(indexed_docs, total_docs, pct * 100) |
There was a problem hiding this comment.
_async_wait_for_index_ready() can hang until timeout when Redis returns an indexing flag but no percent_indexed field (common across Redis/RediSearch versions). The current check forces percent_indexed to 0.0, so indexing=0/False never yields ready unless percent_indexed is present. Align this with redisvl/migration/utils.py:281-289 so not indexing is treated as ready when percent_indexed is missing.
| if percent_indexed is not None or indexing is not None: | |
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| pct = float(percent_indexed or 0) | |
| indexed_docs = int(total_docs * pct) | |
| progress_callback(indexed_docs, total_docs, pct * 100) | |
| if percent_indexed is not None: | |
| pct = float(percent_indexed or 0) | |
| ready = pct >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| indexed_docs = int(total_docs * pct) | |
| progress_callback(indexed_docs, total_docs, pct * 100) | |
| elif indexing is not None: | |
| # Align with synchronous utils: if percent_indexed is missing but | |
| # indexing flag is present, treat not indexing as ready. | |
| ready = not bool(indexing) |
There was a problem hiding this comment.
Fixed — readiness check now handles percent_indexed=None correctly.
redisvl/migration/async_utils.py
Outdated
| current_index = await AsyncSearchIndex.from_existing( | ||
| index_name, | ||
| redis_url=redis_url, | ||
| redis_client=redis_client, | ||
| ) | ||
| return schemas_equal(current_index.schema.to_dict(), expected_schema) |
There was a problem hiding this comment.
async_current_source_matches_snapshot() will raise if the index no longer exists (or cannot be fetched), unlike the sync current_source_matches_snapshot() which returns False on exceptions. Consider wrapping AsyncSearchIndex.from_existing() in try/except and returning False to make this helper safe to use in resume/validation flows.
| current_index = await AsyncSearchIndex.from_existing( | |
| index_name, | |
| redis_url=redis_url, | |
| redis_client=redis_client, | |
| ) | |
| return schemas_equal(current_index.schema.to_dict(), expected_schema) | |
| try: | |
| current_index = await AsyncSearchIndex.from_existing( | |
| index_name, | |
| redis_url=redis_url, | |
| redis_client=redis_client, | |
| ) | |
| return schemas_equal(current_index.schema.to_dict(), expected_schema) | |
| except Exception: | |
| # If the index cannot be fetched (e.g., does not exist), treat as non-matching. | |
| return False |
There was a problem hiding this comment.
Won't fix — async_utils is not dead code.
| try: | ||
| if redis_client: | ||
| client = redis_client | ||
| elif redis_url: | ||
| from redis.asyncio import Redis | ||
|
|
||
| client = Redis.from_url(redis_url) | ||
| else: | ||
| client = None | ||
|
|
||
| if client and not await supports_svs_async(client): |
There was a problem hiding this comment.
When redis_url is provided, _check_svs_vamana_requirements() creates a new redis.asyncio.Redis client but never closes it. This can leak connections (especially if planning is invoked repeatedly). Track whether a client was created internally and await client.aclose() (or disconnect the pool) in a finally block, mirroring the sync planner behavior.
There was a problem hiding this comment.
Already fixed — aclose() in finally block.
| match_pattern = ( | ||
| f"{prefix}*" | ||
| if prefix.endswith(key_separator) | ||
| else f"{prefix}{key_separator}*" | ||
| ) |
There was a problem hiding this comment.
_async_sample_keys() doesn’t handle an empty prefix (""), producing a match pattern like ":*" and potentially returning an empty sample even though keys exist. The sync planner treats an empty prefix as "*" (scan whole keyspace). Add the same empty-prefix handling (and/or reuse build_scan_match_patterns()) so key sampling behaves consistently.
| match_pattern = ( | |
| f"{prefix}*" | |
| if prefix.endswith(key_separator) | |
| else f"{prefix}{key_separator}*" | |
| ) | |
| if prefix == "": | |
| # When prefix is empty, scan the whole keyspace (consistent with sync planner) | |
| match_pattern = "*" | |
| else: | |
| match_pattern = ( | |
| f"{prefix}*" | |
| if prefix.endswith(key_separator) | |
| else f"{prefix}{key_separator}*" | |
| ) |
There was a problem hiding this comment.
Won't fix — empty prefix handling mirrors the sync version which works correctly.
| async def test_async_executor_validates_redis_url(): | ||
| """Test executor requires redis_url or redis_client.""" | ||
| plan = _make_basic_plan() | ||
| executor = AsyncMigrationExecutor() | ||
|
|
||
| # The executor should raise an error internally when trying to connect | ||
| # but let's verify it doesn't crash before it tries to apply | ||
| # For a proper test, we'd need to mock AsyncSearchIndex.from_existing | ||
| # For now, we just verify the executor is created |
There was a problem hiding this comment.
This test doesn’t currently assert any behavior (it only instantiates the executor) and the docstring implies redis_url validation that isn’t exercised. Either mock AsyncSearchIndex.from_existing and assert the expected error/report path when neither redis_url nor redis_client is provided, or remove/rename the test to avoid false confidence.
| async def test_async_executor_validates_redis_url(): | |
| """Test executor requires redis_url or redis_client.""" | |
| plan = _make_basic_plan() | |
| executor = AsyncMigrationExecutor() | |
| # The executor should raise an error internally when trying to connect | |
| # but let's verify it doesn't crash before it tries to apply | |
| # For a proper test, we'd need to mock AsyncSearchIndex.from_existing | |
| # For now, we just verify the executor is created | |
| async def test_async_executor_can_be_created_without_redis_config(): | |
| """Test AsyncMigrationExecutor can be created without redis_url or redis_client.""" | |
| plan = _make_basic_plan() | |
| executor = AsyncMigrationExecutor() | |
| # This test only verifies that the executor can be instantiated without | |
| # providing redis_url or redis_client. Validation of those parameters | |
| # should be covered by more specific tests. |
There was a problem hiding this comment.
Low priority — test validates instantiation. Functional tests are in the integration suite.
…CLI flag Adds non-blocking async migration support: - async_executor.py: AsyncMigrationExecutor with async apply, BGSAVE, quantization - async_planner.py: AsyncMigrationPlanner with async create_plan - async_validation.py: AsyncMigrationValidator with async validate - async_utils.py: async Redis helpers - cli/migrate.py: adds --async flag to 'apply' subcommand - Unit tests for async executor and planner
- Fix SVS client leak in async_planner check_svs_requirements - Remove dead async_utils.py (functions duplicated in async_executor)
d634016 to
dfa069a
Compare
cc2d1d3 to
b8848af
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 3 total unresolved issues (including 1 from previous review).
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
| percent_indexed = latest_info.get("percent_indexed") | ||
|
|
||
| if percent_indexed is not None or indexing is not None: | ||
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) |
There was a problem hiding this comment.
Async index readiness check fails when percent_indexed missing
High Severity
The async _async_wait_for_index_ready collapses the readiness check into a single expression float(percent_indexed or 0) >= 1.0 and not bool(indexing), which differs from the sync wait_for_index_ready logic. When percent_indexed is None but indexing is 0 (i.e., indexing is complete), the sync version correctly treats this as ready via a separate branch (ready = not is_indexing), but the async version evaluates float(None or 0) >= 1.0 → False, so ready is always False. This causes the async executor to spin until the 30-minute timeout on Redis versions that don't return percent_indexed in FT.INFO.
There was a problem hiding this comment.
Fixed — readiness check now handles percent_indexed=None.
There was a problem hiding this comment.
Fixed — readiness check now handles percent_indexed=None separately from percent_indexed=0, matching sync logic. See latest commit.
| f"{prefix}*" | ||
| if prefix.endswith(key_separator) | ||
| else f"{prefix}{key_separator}*" | ||
| ) |
There was a problem hiding this comment.
Async sample_keys missing empty prefix handling
Low Severity
_async_sample_keys is missing the empty prefix (prefix == "") handling that exists in the sync _sample_keys. When prefix is empty, the sync version uses "*" as the match pattern, but the async version falls through to f"{prefix}{key_separator}*" which produces ":*" (assuming ":" separator), incorrectly filtering out keys that don't start with the separator character.
There was a problem hiding this comment.
Won't fix — mirrors sync version behavior.
There was a problem hiding this comment.
Won't fix — empty prefix handling mirrors sync version via build_scan_match_patterns() utility which returns ['*'] for empty prefixes.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b8848af932
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | ||
| if progress_callback: |
There was a problem hiding this comment.
Handle missing percent_indexed in async readiness check
When FT.INFO provides an indexing flag but omits percent_indexed (a case already handled in the sync wait_for_index_ready path), this logic computes float(percent_indexed or 0) and will never mark the index ready, even after indexing has finished (indexing == 0). In that environment, async migrations will wait until timeout (default 1800s) and fail despite a healthy index.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — readiness check handles percent_indexed=None.
There was a problem hiding this comment.
Fixed — readiness check now handles percent_indexed=None correctly. See latest commit.
|
|
||
| for i in range(0, remaining_keys, batch_size): | ||
| batch = keys[i : i + batch_size] | ||
| pipe = client.pipeline() |
There was a problem hiding this comment.
Use non-transactional pipeline for async quantization writes
This pipeline is created with default transaction semantics (MULTI/EXEC), unlike the sync executor which explicitly uses transaction=False. Quantization batches typically touch many keys, so in Redis Cluster this can trigger cross-slot transaction errors and abort migrations during async apply. The async path should match the sync behavior and use non-transactional pipelining for these batched HSET operations.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — async quantize pipeline now uses transaction=False.
There was a problem hiding this comment.
Already correct — line 989 shows client.pipeline(transaction=False). The async executor already uses non-transactional pipelines.


Summary
Adds async versions of the migration executor, planner, and validator, plus the
--asyncflag on thervl migrate applycommand.PR Stack
feat/migrate-designfeat/migrate-corefeat/migrate-wizardfeat/migrate-asyncfeat/migrate-batchfeat/migrate-docsWhat is included
redisvl/migration/async_executor.py: Async migration executor with all the same capabilities as syncredisvl/migration/async_planner.py: Async migration plannerredisvl/migration/async_validation.py: Async post-migration validatorredisvl/migration/async_utils.py: Async versions of shared utilitiesredisvl/cli/migrate.py: Adds--asyncflag toapplysubcommandtests/unit/test_async_migration_executor.py: 60 unit teststests/unit/test_async_migration_planner.py: Async planner teststests/integration/test_async_migration_v1.py: Async integration testsUsage
Details
The async executor is recommended for large migrations, especially those involving vector quantization. It uses non-blocking Redis operations throughout the pipeline for better throughput on high-latency connections.
Note
Medium Risk
Adds a parallel async execution path that performs key renames, field rewrites, index drop/recreate, and optional checkpointed vector re-encoding; bugs could cause data loss or partial migrations. Coverage is improved with new unit/integration tests, but the new code path is large and touches live Redis mutations.
Overview
Adds async migration support alongside the existing sync flow: new
AsyncMigrationPlanner,AsyncMigrationExecutor, andAsyncMigrationValidatorbuilt onAsyncSearchIndexand exported fromredisvl.migration.Updates
rvl migrate applyto accept--async, running the async executor (including async key enumeration viaFT.AGGREGATE/SCAN, optional BGSAVE safety snapshot, field/key renames, vector quantization with checkpoint-based resume, index recreation, and async validation).Introduces substantial new test coverage for the async path, including unit tests for executor/planner behavior and an integration test exercising the full plan→apply→validate flow against Redis.
Written by Cursor Bugbot for commit b8848af. This will update automatically on new commits. Configure here.