Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions examples/api_request_parallel_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
Inputs:
- requests_filepath : str
- path to the file containing the requests to be processed
- file should be a jsonl file, where each line is a json object with API parameters
- e.g., {"model": "text-embedding-ada-002", "input": "embed me"}
- file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field
- e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}}
- as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically)
- an example file is provided at examples/data/example_requests_to_parallel_process.jsonl
- the code to generate the example file is appended to the bottom of this script
Expand Down Expand Up @@ -164,6 +164,7 @@ async def process_api_requests_from_file(
request_json=request_json,
token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, token_encoding_name),
attempts_left=max_attempts,
metadata=request_json.pop("metadata", None)
)
status_tracker.num_tasks_started += 1
status_tracker.num_tasks_in_progress += 1
Expand Down Expand Up @@ -258,6 +259,7 @@ class APIRequest:
request_json: dict
token_consumption: int
attempts_left: int
metadata: dict
result: list = field(default_factory=list)

async def call_api(
Expand Down Expand Up @@ -298,11 +300,21 @@ async def call_api(
retry_queue.put_nowait(self)
else:
logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}")
append_to_jsonl([self.request_json, [str(e) for e in self.result]], save_filepath)
data = (
[self.request_json, [str(e) for e in self.result], self.metadata]
if self.metadata
else [self.request_json, [str(e) for e in self.result]]
)
append_to_jsonl(data, save_filepath)
status_tracker.num_tasks_in_progress -= 1
status_tracker.num_tasks_failed += 1
else:
append_to_jsonl([self.request_json, response], save_filepath)
data = (
[self.request_json, response, self.metadata]
if self.metadata
else [self.request_json, response]
)
append_to_jsonl(data, save_filepath)
status_tracker.num_tasks_in_progress -= 1
status_tracker.num_tasks_succeeded += 1
logging.debug(f"Request {self.task_id} saved to {save_filepath}")
Expand Down
2 changes: 1 addition & 1 deletion examples/data/example_requests_to_parallel_process.jsonl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"model": "text-embedding-ada-002", "input": "0\n"}
{"model": "text-embedding-ada-002", "input": "0\n", "metadata": {"row_id": 1}}
{"model": "text-embedding-ada-002", "input": "1\n"}
{"model": "text-embedding-ada-002", "input": "2\n"}
{"model": "text-embedding-ada-002", "input": "3\n"}
Expand Down