Skip to content

feat(clp-package): Add support for uploading query result files to S3.#1722

Merged
hoophalab merged 7 commits intoy-scope:mainfrom
hoophalab:s3
Dec 6, 2025
Merged

feat(clp-package): Add support for uploading query result files to S3.#1722
hoophalab merged 7 commits intoy-scope:mainfrom
hoophalab:s3

Conversation

@hoophalab
Copy link
Contributor

@hoophalab hoophalab commented Dec 3, 2025

Description

Currently, the query workers can write query results to a local filesystem by setting "write_to_file": true in the API Server, which supports higher max_num_results than mongo db.

However, in a multi node setup without seaweed fs, the API server cannot read query result files on the local filesystem of the query worker.

This PR adds support for uploading those query result files to S3 in the query worker.

There will be an upcoming PR today which reads results from S3 in the API server.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  1. Change stream_output storage to s3
stream_output:
  storage:
    type: "s3"
    staging_directory: "var/data/staged-streams"  # Or a path of your choosing
    s3_config:
      region_code: "us-east-1"
      bucket: "test-bucket"
      key_prefix: "streams/"
      aws_authentication:
        type: "credentials"
        credentials:
          access_key_id: "..."
          secret_access_key: "..."
  1. Send a request to the API Server with "write_to_file": true
curl -X POST http://localhost:3001/query \
 -H "Content-Type: application/json" \
 -d '{
   "query_string": "*log*",
   "dataset": "default",
   "ignore_case": false,
   "max_num_results": 100,
   "write_to_file": true
 }'
  1. The query result files are under s3://test-bucket/streams/{search_job_id}
    (tested on localstack)

Summary by CodeRabbit

  • New Features

    • Query-generated files are automatically uploaded to S3 when S3 storage and file output are enabled.
  • Bug Fixes / Reliability

    • Uploads now include start/finish logging.
    • Upload failures are caught, recorded (error path set), cause the task to be marked as failed, and local source files are removed after upload.

✏️ Tip: You can customize this high-level summary in your review settings.

@hoophalab hoophalab requested a review from a team as a code owner December 3, 2025 18:16
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 3, 2025

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding S3 upload support for query result files, which is the core functionality introduced in this PR.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a88c76b and 3789c70.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-12-16T21:49:06.483Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 634
File: components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py:0-0
Timestamp: 2024-12-16T21:49:06.483Z
Learning: In `fs_compression_task.py`, when refactoring the archive processing loop in Python, the `src_archive_file.unlink()` operation should remain outside of the `process_archive` function.

Applied to files:

  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
📚 Learning: 2024-11-15T20:07:22.256Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.

Applied to files:

  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (3)
components/clp-py-utils/clp_py_utils/s3_utils.py (3)
  • generate_s3_virtual_hosted_style_url (243-253)
  • get_credential_env_vars (62-100)
  • s3_put (281-309)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • QueryTaskStatus (64-74)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
  • StorageType (135-137)
  • get_directory (639-640)
  • get_directory (650-651)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: build (macos-15)
  • GitHub Check: build (ubuntu-24.04)
🔇 Additional comments (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)

15-19: Imports for S3 utilities and QueryTaskStatus look correct and consistent with usage.

The added imports are used appropriately in _make_core_clp_s_command_and_env_vars and the new post‑processing block, and they match the existing S3 utility interfaces and scheduler status enum. No issues here.

Also applies to: 29-29

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Ruff (0.14.7)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py

�[1;31mruff failed�[0m
�[1mCause:�[0m Failed to load extended configuration /tools/yscope-dev-utils/exports/lint-configs/python/ruff.toml (/components/job-orchestration/pyproject.toml extends /tools/yscope-dev-utils/exports/lint-configs/python/ruff.toml)
�[1mCause:�[0m Failed to read /tools/yscope-dev-utils/exports/lint-configs/python/ruff.toml
�[1mCause:�[0m No such file or directory (os error 2)


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1012bd4 and f477d78.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (2)
components/clp-py-utils/clp_py_utils/s3_utils.py (3)
  • generate_s3_virtual_hosted_style_url (243-253)
  • get_credential_env_vars (62-100)
  • s3_put (281-309)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
  • StorageType (135-137)
  • get_directory (639-640)
  • get_directory (650-651)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: build (macos-15)
🔇 Additional comments (2)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (2)

15-19: LGTM!

The addition of s3_put to the import group is correct and necessary for the new S3 upload functionality.


257-257: Verify that s3_config is guaranteed non-null when storage type is S3.

The code accesses storage_config.s3_config without an explicit null check. While the condition at line 254 checking StorageType.S3 should ensure the configuration exists, confirm that the configuration schema guarantees s3_config is present when the storage type is S3.

Copy link
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we merge, can you confirm the behavior on a failure upload is expected (like logs and status)?

storage_config = worker_config.stream_output.storage
if StorageType.S3 == storage_config.type and search_config.write_to_file:
s3_config = storage_config.s3_config
dest_path = f"{job_id}/{archive_id}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a "root" to the destination path like how we store stream output directory?

Copy link
Contributor Author

@hoophalab hoophalab Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The files are uploaded to s3://test-bucket/streams/1/61f39b1c-bc17-409b-8abc-ee7ffb383077 if the following config is used. s3_put adds key_prefix: "streams/" before dest_path similar to extract_stream_task. Not sure whether this is what you are asking for?

stream_output:
  storage:
    type: "s3"
    staging_directory: "var/data/staged-streams"  # Or a path of your choosing
    s3_config:
      region_code: "us-east-1"
      bucket: "test-bucket"
      key_prefix: "streams/"
      aws_authentication:
        type: "credentials"
        credentials:
          access_key_id: "..."
          secret_access_key: "..."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see. Then it's good.

@hoophalab
Copy link
Contributor Author

I try to upload to a bucket that doesn't exist. The following logs are produced, and status is set as FAILED (3) in query_jobs table.
Because status is Failed, api server will return a 500 internal server error.

[2025-12-04 02:28:41,985: INFO/ForkPoolWorker-1] job_orchestration.executor.query.fs_search_task.search[e0106ca9-75cb-4c7d-994f-3e63637a77b1]: Started search task for job 1
[2025-12-04 02:28:41,993: INFO/ForkPoolWorker-1] job_orchestration.executor.query.fs_search_task.search[e0106ca9-75cb-4c7d-994f-3e63637a77b1]: Running: /opt/clp/bin/clp-s s /var/data/archives/postgres --archive-id b162dba4-1ae6-40cd-bd38-75232cfa35b8 "session_id":"64211afb.1e83" file --path /var/data/staged-streams/1/b162dba4-1ae6-40cd-bd38-75232cfa35b8
[2025-12-04 02:28:41,997: INFO/ForkPoolWorker-1] job_orchestration.executor.query.fs_search_task.search[e0106ca9-75cb-4c7d-994f-3e63637a77b1]: Waiting for search to finish
[2025-12-04 02:28:42,329: INFO/ForkPoolWorker-1] job_orchestration.executor.query.fs_search_task.search[e0106ca9-75cb-4c7d-994f-3e63637a77b1]: search task 1 completed for job 1
[2025-12-04 02:28:42,332: INFO/ForkPoolWorker-1] job_orchestration.executor.query.fs_search_task.search[e0106ca9-75cb-4c7d-994f-3e63637a77b1]: Uploading query results 1/b162dba4-1ae6-40cd-bd38-75232cfa35b8 to S3...
[2025-12-04 02:28:42,446: ERROR/ForkPoolWorker-1] job_orchestration.executor.query.fs_search_task.search[e0106ca9-75cb-4c7d-994f-3e63637a77b1]: Failed to upload query results 1/b162dba4-1ae6-40cd-bd38-75232cfa35b8: An error occurred (NoSuchBucket) when calling the PutObject operation: The specified bucket does not exist

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3974070 and a88c76b.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-11-15T20:07:22.256Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.

Applied to files:

  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (3)
components/clp-py-utils/clp_py_utils/s3_utils.py (3)
  • generate_s3_virtual_hosted_style_url (243-253)
  • get_credential_env_vars (62-100)
  • s3_put (281-309)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • QueryTaskStatus (64-74)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
  • StorageType (135-137)
  • get_directory (639-640)
  • get_directory (650-651)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: build (macos-15)
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (2)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (2)

15-19: LGTM!

The import of s3_put is correctly added alongside the existing S3 utility imports.


29-29: LGTM!

The import of QueryTaskStatus is correctly added for use in the upload failure handling.

Copy link
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR title lgtm.

storage_config = worker_config.stream_output.storage
if StorageType.S3 == storage_config.type and search_config.write_to_file:
s3_config = storage_config.s3_config
dest_path = f"{job_id}/{archive_id}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see. Then it's good.

@hoophalab hoophalab merged commit 2995bcf into y-scope:main Dec 6, 2025
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants