|
1 | 1 | import concurrent.futures |
2 | 2 | import uuid |
| 3 | +import json |
| 4 | +import copy |
3 | 5 | from collections import defaultdict |
4 | 6 | from functools import partial |
5 | 7 | from logging import getLogger |
|
20 | 22 | Union, |
21 | 23 | ) |
22 | 24 |
|
| 25 | + |
23 | 26 | from darwin.datatypes import ( |
24 | 27 | AnnotationFile, |
25 | 28 | Property, |
@@ -1864,6 +1867,17 @@ def _import_annotations( |
1864 | 1867 |
|
1865 | 1868 | try: |
1866 | 1869 | dataset.import_annotation(id, payload=payload) |
| 1870 | + except RequestEntitySizeExceeded: |
| 1871 | + logger.warning( |
| 1872 | + "Annotation payload exceeds request entity size. Splitting payload into smaller chunks for import." |
| 1873 | + ) |
| 1874 | + payloads = _split_payloads(payload) |
| 1875 | + for chunked_payload in payloads: |
| 1876 | + try: |
| 1877 | + dataset.import_annotation(id, payload=chunked_payload) |
| 1878 | + except Exception as e: |
| 1879 | + errors.append(e) |
| 1880 | + success = dt.Success.FAILURE |
1867 | 1881 | except Exception as e: |
1868 | 1882 | errors.append(e) |
1869 | 1883 | success = dt.Success.FAILURE |
@@ -2185,3 +2199,57 @@ def _warn_for_annotations_with_multiple_instance_ids( |
2185 | 2199 | console.print( |
2186 | 2200 | f"- File: {file} has {files_with_multi_instance_id_annotations[file]} annotation(s) with multiple instance IDs" |
2187 | 2201 | ) |
| 2202 | + |
| 2203 | + |
| 2204 | +def _split_payloads( |
| 2205 | + payload: Dict[str, Any], max_payload_size: int = 32_000_000 |
| 2206 | +) -> List[Dict[str, Any]]: |
| 2207 | + """ |
| 2208 | + Splits a payload into smaller chunks to avoid HTTP 413 errors due to large request entity sizes. |
| 2209 | +
|
| 2210 | + This function takes an input payload and splits it into smaller payloads, ensuring each chunk does not exceed the specified maximum size. This is particularly useful when importing annotations, as it prevents HTTP 413 errors (`RequestEntitySizeExceeded`) from occurring due to large request entity sizes. |
| 2211 | +
|
| 2212 | + Parameters |
| 2213 | + ---------- |
| 2214 | + payload : Dict[str, Any] |
| 2215 | + The input payload to be split. |
| 2216 | + max_payload_size : int, optional |
| 2217 | + The maximum size of each split payload. Defaults to 32,000,000 bytes. |
| 2218 | +
|
| 2219 | + Returns |
| 2220 | + ------- |
| 2221 | + List[Dict[str, Any]] |
| 2222 | + A list of split payloads, each not exceeding the specified maximum size. |
| 2223 | +
|
| 2224 | + Raises |
| 2225 | + ------ |
| 2226 | + ValueError |
| 2227 | + If any single annotation exceeds the `max_payload_size` limit |
| 2228 | + """ |
| 2229 | + payloads = [] |
| 2230 | + base_payload = {"annotations": [], "overwrite": payload["overwrite"]} |
| 2231 | + current_payload = copy.deepcopy(base_payload) |
| 2232 | + current_payload_size = 0 |
| 2233 | + |
| 2234 | + for annotation in payload["annotations"]: |
| 2235 | + annotation_size = len(json.dumps({"annotations": [annotation]}).encode("utf-8")) |
| 2236 | + if current_payload_size + annotation_size < max_payload_size: |
| 2237 | + current_payload["annotations"].append(annotation) |
| 2238 | + current_payload_size += annotation_size |
| 2239 | + else: |
| 2240 | + if annotation_size > max_payload_size: |
| 2241 | + raise ValueError( |
| 2242 | + f"One or more annotations exceed the maximum allowed size of 32 MiB ({max_payload_size})" |
| 2243 | + ) |
| 2244 | + payloads.append(current_payload) |
| 2245 | + current_payload = copy.deepcopy(base_payload) |
| 2246 | + current_payload["overwrite"] = ( |
| 2247 | + False # Required to make sure subsequent payloads don't overwrite previous ones |
| 2248 | + ) |
| 2249 | + current_payload["annotations"].append(annotation) |
| 2250 | + current_payload_size = annotation_size |
| 2251 | + |
| 2252 | + if current_payload["annotations"]: |
| 2253 | + payloads.append(current_payload) |
| 2254 | + |
| 2255 | + return payloads |
0 commit comments