Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0d186e6
Refactor dataset related code
haiqi96 Jun 20, 2025
75ac0ff
further refactor
haiqi96 Jun 20, 2025
bb1e5f4
Linter
haiqi96 Jun 20, 2025
ba7cfe1
A few more fixes
haiqi96 Jun 20, 2025
68454c6
Linter fixes
haiqi96 Jun 20, 2025
c1de746
missing fixes
haiqi96 Jun 20, 2025
d797198
Fix mistake
haiqi96 Jun 20, 2025
8c39e77
actually fixing
haiqi96 Jun 20, 2025
d570ab6
Linter again
haiqi96 Jun 20, 2025
398ab5e
Merge branch 'main' into DatasetRefactor
haiqi96 Jun 25, 2025
7759a7a
Merge remote-tracking branch 'origin/main' into DatasetRefactor
haiqi96 Jun 27, 2025
e6b8cc7
Linter
haiqi96 Jun 27, 2025
7a468c3
Merge branch 'main' into DatasetRefactor
Bill-hbrhbr Jun 29, 2025
1dd1cea
Move default dataset metadata table creation to start_clp
Bill-hbrhbr Jun 29, 2025
a0c3c29
Remove unused import
Bill-hbrhbr Jun 29, 2025
a9bf615
Address review comments
Bill-hbrhbr Jun 30, 2025
fe05f5f
Replace the missing SUFFIX
Bill-hbrhbr Jun 30, 2025
39a9278
Move suffix constants from clp_config to clp_metadata_db_utils local …
Bill-hbrhbr Jun 30, 2025
7124828
Refactor archive_manager.py.
kirkrodrigues Jun 30, 2025
eb80992
Refactor s3_utils.py.
kirkrodrigues Jun 30, 2025
5ed44e7
compression_task.py: Fix typing errors and minor refactoring.
kirkrodrigues Jun 30, 2025
af6b508
compression_scheduler.py: Remove exception swallow which will hide un…
kirkrodrigues Jun 30, 2025
67fb01f
Refactor query_scheduler.py.
kirkrodrigues Jun 30, 2025
d6ad4de
clp_metadata_db_utils.py: Minor refactoring.
kirkrodrigues Jun 30, 2025
ff7d700
clp_metadata_db_utils.py: Rename _generic_get_table_name -> _get_tabl…
kirkrodrigues Jun 30, 2025
7ffc77c
clp_metadata_db_utils.py: Alphabetize new public functions.
kirkrodrigues Jun 30, 2025
0255cbd
clp_metadata_db_utils.py: Reorder public and private functions for co…
kirkrodrigues Jun 30, 2025
1076a3f
initialize-clp-metadata-db.py: Remove changes unrelated to PR.
kirkrodrigues Jun 30, 2025
71c4d82
Move default dataset creation into compression_scheduler so that it r…
kirkrodrigues Jun 30, 2025
6bd9372
Apply suggestions from code review
kirkrodrigues Jul 1, 2025
84df2e2
Merge branch 'main' into DatasetRefactor
kirkrodrigues Jul 1, 2025
983bea1
Remove bug fix that's no longer necessary.
kirkrodrigues Jul 1, 2025
bdb7817
Fix bug where dataset has a default value instead of None when using …
Bill-hbrhbr Jul 1, 2025
a82a267
Correctly feed in the input config dataset names
Bill-hbrhbr Jul 1, 2025
f699496
Remove unnecessary changes
Bill-hbrhbr Jul 1, 2025
90ce0a4
Update the webui to pass the dataset name in the clp-json code path (…
kirkrodrigues Jul 2, 2025
d6f9e5a
Move dataset into the user function
haiqi96 Jul 2, 2025
dc6a706
Merge branch 'DatasetRefactor' of https://github.com/haiqi96/clp_fork…
haiqi96 Jul 2, 2025
76bcb4a
Remove unnecessary f string specifier
haiqi96 Jul 2, 2025
a4e6f83
Apply suggestions from code review
haiqi96 Jul 2, 2025
7b42568
Add import type.
kirkrodrigues Jul 2, 2025
afe43ce
Merge branch 'main' into DatasetRefactor
haiqi96 Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from pathlib import Path

from clp_py_utils.clp_config import (
ARCHIVE_TAGS_TABLE_SUFFIX,
ARCHIVES_TABLE_SUFFIX,
CLP_DEFAULT_DATASET_NAME,
Database,
FILES_TABLE_SUFFIX,
StorageEngine,
)
from clp_py_utils.clp_metadata_db_utils import (
get_archive_tags_table_name,
get_archives_table_name,
get_files_table_name,
)
from clp_py_utils.sql_adapter import SQL_Adapter

from clp_package_utils.general import (
Expand Down Expand Up @@ -191,12 +193,15 @@ def main(argv: typing.List[str]) -> int:
logger.error("`archive_output.directory` doesn't exist.")
return -1

dataset: typing.Optional[str] = None
if StorageEngine.CLP_S == storage_engine:
dataset = CLP_DEFAULT_DATASET_NAME

if FIND_COMMAND == parsed_args.subcommand:
return _find_archives(
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
dataset,
parsed_args.begin_ts,
parsed_args.end_ts,
)
Expand All @@ -207,8 +212,7 @@ def main(argv: typing.List[str]) -> int:
return _delete_archives(
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
dataset,
delete_handler,
parsed_args.dry_run,
)
Expand All @@ -219,8 +223,7 @@ def main(argv: typing.List[str]) -> int:
return _delete_archives(
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
dataset,
delete_handler,
parsed_args.dry_run,
)
Expand All @@ -235,8 +238,7 @@ def main(argv: typing.List[str]) -> int:
def _find_archives(
archives_dir: Path,
database_config: Database,
storage_engine: StorageEngine,
dataset: str,
dataset: typing.Optional[str],
begin_ts: int,
end_ts: int = typing.Optional[int],
) -> int:
Expand All @@ -245,7 +247,6 @@ def _find_archives(
`begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts`.
:param archives_dir:
:param database_config:
:param storage_engine:
:param dataset:
:param begin_ts:
:param end_ts:
Expand All @@ -259,16 +260,14 @@ def _find_archives(
database_config.get_clp_connection_params_and_type(True)
)
table_prefix: str = clp_db_connection_params["table_prefix"]
if StorageEngine.CLP_S == storage_engine:
table_prefix = f"{table_prefix}{dataset}_"

with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
query_params: typing.List[int] = [begin_ts]
query: str = (
f"""
SELECT id FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}`
SELECT id FROM `{get_archives_table_name(table_prefix, dataset)}`
WHERE begin_timestamp >= %s
"""
)
Expand All @@ -285,9 +284,10 @@ def _find_archives(
return 0

logger.info(f"Found {len(archive_ids)} archives within the specified time range.")
archive_output_dir = archives_dir / dataset if dataset is not None else archives_dir
for archive_id in archive_ids:
logger.info(archive_id)
archive_path: Path = archives_dir / dataset / archive_id
archive_path = archive_output_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} in database not found on disk.")

Expand All @@ -302,7 +302,6 @@ def _find_archives(
def _delete_archives(
archives_dir: Path,
database_config: Database,
storage_engine: StorageEngine,
dataset: str,
delete_handler: DeleteHandler,
dry_run: bool = False,
Expand All @@ -312,7 +311,6 @@ def _delete_archives(

:param archives_dir:
:param database_config:
:param storage_engine:
:param dataset:
:param delete_handler: Object to handle differences between by-filter and by-ids delete types.
:param dry_run: If True, no changes will be made to the database or disk.
Expand All @@ -327,8 +325,6 @@ def _delete_archives(
database_config.get_clp_connection_params_and_type(True)
)
table_prefix = clp_db_connection_params["table_prefix"]
if StorageEngine.CLP_S == storage_engine:
table_prefix = f"{table_prefix}{dataset}_"

with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
Expand All @@ -341,7 +337,7 @@ def _delete_archives(

db_cursor.execute(
f"""
DELETE FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}`
DELETE FROM `{get_archives_table_name(table_prefix, dataset)}`
WHERE {query_criteria}
RETURNING id
""",
Expand All @@ -360,14 +356,14 @@ def _delete_archives(

db_cursor.execute(
f"""
DELETE FROM `{table_prefix}{FILES_TABLE_SUFFIX}`
DELETE FROM `{get_files_table_name(table_prefix, dataset)}`
WHERE archive_id in ({ids_list_string})
"""
)

db_cursor.execute(
f"""
DELETE FROM `{table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX}`
DELETE FROM `{get_archive_tags_table_name(table_prefix, dataset)}`
WHERE archive_id in ({ids_list_string})
"""
)
Expand All @@ -387,8 +383,9 @@ def _delete_archives(

logger.info(f"Finished deleting archives from the database.")

archive_output_dir: Path = archives_dir / dataset if dataset is not None else archives_dir
for archive_id in archive_ids:
archive_path: Path = archives_dir / dataset / archive_id
archive_path = archive_output_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.")
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import pathlib
import sys
import time
import typing
from contextlib import closing
from typing import List
from typing import List, Optional, Union
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Remove unused import.

The Optional import is not used anywhere in the code and should be removed to keep imports clean.

-from typing import List, Optional, Union
+from typing import List, Union
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from typing import List, Optional, Union
-from typing import List, Optional, Union
+from typing import List, Union
🧰 Tools
🪛 Flake8 (7.2.0)

[error] 8-8: 'typing.Optional' imported but unused

(F401)

🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/native/compress.py at
line 8, the import statement includes Optional which is not used anywhere in the
code. Remove Optional from the import statement to clean up unused imports and
keep the code tidy.


import brotli
import msgpack
from clp_py_utils.clp_config import CLPConfig, COMPRESSION_JOBS_TABLE_NAME
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
COMPRESSION_JOBS_TABLE_NAME,
StorageEngine,
)
from clp_py_utils.pretty_size import pretty_size
from clp_py_utils.s3_utils import parse_s3_url
from clp_py_utils.sql_adapter import SQL_Adapter
Expand Down Expand Up @@ -132,28 +136,37 @@ def handle_job(sql_adapter: SQL_Adapter, clp_io_config: ClpIoConfig, no_progress


def _generate_clp_io_config(
clp_config: CLPConfig, logs_to_compress: List[str], parsed_args: argparse.Namespace
) -> typing.Union[S3InputConfig, FsInputConfig]:
input_type = clp_config.logs_input.type
clp_config: CLPConfig,
logs_to_compress: List[str],
parsed_args: argparse.Namespace,
) -> Union[S3InputConfig, FsInputConfig]:
dataset = (
CLP_DEFAULT_DATASET_NAME
if StorageEngine.CLP_S == clp_config.package.storage_engine
else None
)

input_type = clp_config.logs_input.type
if InputType.FS == input_type:
if len(logs_to_compress) == 0:
raise ValueError(f"No input paths given.")
raise ValueError("No input paths given.")
return FsInputConfig(
dataset=dataset,
paths_to_compress=logs_to_compress,
timestamp_key=parsed_args.timestamp_key,
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
)
elif InputType.S3 == input_type:
if len(logs_to_compress) == 0:
raise ValueError(f"No URLs given.")
raise ValueError("No URLs given.")
elif len(logs_to_compress) != 1:
raise ValueError(f"Too many URLs: {len(logs_to_compress)} > 1")

s3_url = logs_to_compress[0]
region_code, bucket_name, key_prefix = parse_s3_url(s3_url)
aws_authentication = clp_config.logs_input.aws_authentication
return S3InputConfig(
dataset=dataset,
region_code=region_code,
bucket=bucket_name,
key_prefix=key_prefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
Database,
FILES_TABLE_SUFFIX,
)
from clp_py_utils.clp_metadata_db_utils import get_files_table_name
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import (
Expand Down Expand Up @@ -54,8 +54,9 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
files_table_name = get_files_table_name(table_prefix, None)
db_cursor.execute(
f"SELECT orig_file_id FROM `{table_prefix}{FILES_TABLE_SUFFIX}` WHERE path = (%s)",
f"SELECT orig_file_id FROM `{files_table_name}` WHERE path = (%s)",
(path,),
)
results = db_cursor.fetchall()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

import msgpack
import pymongo
from clp_py_utils.clp_config import Database, ResultsCache
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
Database,
ResultsCache,
StorageEngine,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig
Expand All @@ -32,6 +37,7 @@
def create_and_monitor_job_in_db(
db_config: Database,
results_cache: ResultsCache,
dataset: str | None,
wildcard_query: str,
tags: str | None,
begin_timestamp: int | None,
Expand All @@ -43,6 +49,7 @@ def create_and_monitor_job_in_db(
count_by_time_bucket_size: int | None,
):
search_config = SearchJobConfig(
dataset=dataset,
query_string=wildcard_query,
begin_timestamp=begin_timestamp,
end_timestamp=end_timestamp,
Expand Down Expand Up @@ -113,6 +120,7 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci
async def do_search_without_aggregation(
db_config: Database,
results_cache: ResultsCache,
dataset: str | None,
wildcard_query: str,
tags: str | None,
begin_timestamp: int | None,
Expand Down Expand Up @@ -147,6 +155,7 @@ async def do_search_without_aggregation(
create_and_monitor_job_in_db,
db_config,
results_cache,
dataset,
wildcard_query,
tags,
begin_timestamp,
Expand Down Expand Up @@ -184,6 +193,7 @@ async def do_search_without_aggregation(
async def do_search(
db_config: Database,
results_cache: ResultsCache,
dataset: str | None,
wildcard_query: str,
tags: str | None,
begin_timestamp: int | None,
Expand All @@ -198,6 +208,7 @@ async def do_search(
await do_search_without_aggregation(
db_config,
results_cache,
dataset,
wildcard_query,
tags,
begin_timestamp,
Expand All @@ -211,6 +222,7 @@ async def do_search(
create_and_monitor_job_in_db,
db_config,
results_cache,
dataset,
wildcard_query,
tags,
begin_timestamp,
Expand Down Expand Up @@ -281,11 +293,17 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

dataset = (
CLP_DEFAULT_DATASET_NAME
if StorageEngine.CLP_S == clp_config.package.storage_engine
else None
)
try:
asyncio.run(
do_search(
clp_config.database,
clp_config.results_cache,
dataset,
parsed_args.wildcard_query,
parsed_args.tags,
parsed_args.begin_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import yaml
from clp_py_utils.clp_config import (
ALL_TARGET_NAME,
ARCHIVES_TABLE_SUFFIX,
AwsAuthType,
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
Expand All @@ -23,7 +22,6 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
FILES_TABLE_SUFFIX,
QUERY_JOBS_TABLE_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
Expand All @@ -35,6 +33,10 @@
StorageType,
WEBUI_COMPONENT_NAME,
)
from clp_py_utils.clp_metadata_db_utils import (
get_archives_table_name,
get_files_table_name,
)
from clp_py_utils.s3_utils import generate_container_auth_options
from job_orchestration.scheduler.constants import QueueName
from pydantic import BaseModel
Expand Down Expand Up @@ -868,13 +870,14 @@ def start_webui(
# Read, update, and write back client's and server's settings.json
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
dataset: Optional[str] = None
if StorageEngine.CLP_S == clp_config.package.storage_engine:
table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_"
dataset = CLP_DEFAULT_DATASET_NAME
Comment on lines +873 to +875
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Minor nit: collapse the two-step dataset assignment

The explicit type annotation is helpful, but the two-step pattern is verbose. You could inline it without losing clarity:

-    dataset: Optional[str] = None
-    if StorageEngine.CLP_S == clp_config.package.storage_engine:
-        dataset = CLP_DEFAULT_DATASET_NAME
+    dataset: Optional[str] = (
+        CLP_DEFAULT_DATASET_NAME
+        if StorageEngine.CLP_S == clp_config.package.storage_engine
+        else None
+    )

Reduces cognitive load and keeps the flow tight.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataset: Optional[str] = None
if StorageEngine.CLP_S == clp_config.package.storage_engine:
table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_"
dataset = CLP_DEFAULT_DATASET_NAME
dataset: Optional[str] = (
CLP_DEFAULT_DATASET_NAME
if StorageEngine.CLP_S == clp_config.package.storage_engine
else None
)
🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/start_clp.py around
lines 873 to 875, the variable 'dataset' is assigned in two steps with an
explicit type annotation followed by a conditional assignment. Simplify this by
combining the declaration and conditional assignment into a single line using a
conditional expression, preserving the Optional[str] type clarity while reducing
verbosity and improving readability.

client_settings_json_updates = {
"ClpStorageEngine": clp_config.package.storage_engine,
"MongoDbSearchResultsMetadataCollectionName": clp_config.webui.results_metadata_collection_name,
"SqlDbClpArchivesTableName": f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}",
"SqlDbClpFilesTableName": f"{table_prefix}{FILES_TABLE_SUFFIX}",
"SqlDbClpArchivesTableName": get_archives_table_name(table_prefix, dataset),
"SqlDbClpFilesTableName": get_files_table_name(table_prefix, dataset),
"SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME,
}
client_settings_json = read_and_update_settings_json(
Expand All @@ -884,6 +887,7 @@ def start_webui(
client_settings_json_file.write(json.dumps(client_settings_json))

server_settings_json_updates = {
"ClpStorageEngine": clp_config.package.storage_engine,
"SqlDbHost": clp_config.database.host,
"SqlDbPort": clp_config.database.port,
"SqlDbName": clp_config.database.name,
Expand Down
Loading