Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IO-1444] LongVideo Support #642

Merged
merged 17 commits into from
Aug 17, 2023
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,5 @@ scripts/

.ruff_cache/

!darwin/future/tests/data_objects/workflow/data
!darwin/future/tests/data_objects/workflow/data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always like a data based test

!tests/darwin/dataset/data
5 changes: 4 additions & 1 deletion darwin/cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
IncompatibleOptions,
InvalidLogin,
MissingConfig,
MissingDependency,
MissingSchema,
NameTaken,
NotFound,
Expand Down Expand Up @@ -376,7 +377,7 @@ def export_dataset(
)
except ValidationError:
_error("Nothing to export")
else:
else:
identifier.version = name
print(f"Dataset {dataset_slug} successfully exported to {identifier}")
print_new_version_info(client)
Expand Down Expand Up @@ -443,6 +444,8 @@ def pull_dataset(
f"Version '{dataset.identifier}:{version}' is of format '{uef.format}', "
f"only the darwin formats ('json', 'darwin_json_2') are supported for `darwin dataset pull`"
)
except MissingDependency as e:
_error(str(e))

print(f"Dataset {release.identifier} downloaded at {dataset.local_path} .")

Expand Down
177 changes: 166 additions & 11 deletions darwin/dataset/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
import time
import urllib
from pathlib import Path
from typing import Any, Callable, Iterable, List, Optional, Tuple
from tempfile import TemporaryDirectory
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple

import deprecation
import numpy as np
import orjson as json
import requests
from PIL import Image
from requests.adapters import HTTPAdapter, Retry
from rich.console import Console

import darwin.datatypes as dt
from darwin.dataset.utils import sanitize_filename
from darwin.datatypes import AnnotationFile
from darwin.exceptions import MissingDependency
from darwin.utils import (
attempt_decode,
get_response_content,
Expand Down Expand Up @@ -233,13 +236,12 @@ def lazy_download_image_from_annotation(
If the format of the annotation is not supported.
"""

console = Console()

if annotation_format == "json":
return _download_image_from_json_annotation(
api_key, annotation_path, images_path, use_folders, video_frames, force_slots, ignore_slots
)
else:
console = Console()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but I feel like we need a Logging style console getter/factory to maintain the console(s) in use.

console.print("[bold red]Unsupported file format. Please use 'json'.")
raise NotImplementedError

Expand Down Expand Up @@ -278,18 +280,33 @@ def _download_image_from_json_annotation(
return []


def _download_all_slots_from_json_annotation(annotation, api_key, parent_path, video_frames):
def _download_all_slots_from_json_annotation(
annotation: dt.AnnotationFile, api_key: str, parent_path: Path, video_frames: bool
) -> Iterable[Callable[[], None]]:
generator = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code, but this feels like a name we should change, given that it's a list, and really not even acting as a generator. At best it could feed a generator's output.

Anyway, I digress slightly...

for slot in annotation.slots:
if not slot.name:
raise ValueError("Slot name is required to download all slots")
slot_path = parent_path / sanitize_filename(annotation.filename) / sanitize_filename(slot.name)
slot_path.mkdir(exist_ok=True, parents=True)

if video_frames and slot.type != "image":
video_path: Path = slot_path / "sections"
video_path.mkdir(exist_ok=True, parents=True)
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))
if not slot.frame_urls:
segment_manifests = get_segment_manifests(slot, slot_path, api_key)
for index, manifest in enumerate(segment_manifests):
if slot.segments is None:
raise ValueError("No segments found")
segment_url = slot.segments[index]["url"]
path = video_path / f".{index:07d}.ts"
generator.append(
functools.partial(_download_and_extract_video_segment, segment_url, api_key, path, manifest)
)
else:
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this takes a while?

else:
for upload in slot.source_files:
file_path = slot_path / sanitize_filename(upload["file_name"])
Expand All @@ -306,16 +323,29 @@ def _download_single_slot_from_json_annotation(
annotation_path: Path,
video_frames: bool,
use_folders: bool = False,
):
) -> Iterable[Callable[[], None]]:
slot = annotation.slots[0]
generator = []

if video_frames and slot.type != "image":
video_path: Path = parent_path / annotation_path.stem
video_path.mkdir(exist_ok=True, parents=True)
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))

# Indicates it's a long video and uses the segment and manifest
if not slot.frame_urls:
segment_manifests = get_segment_manifests(slot, video_path, api_key)
for index, manifest in enumerate(segment_manifests):
if slot.segments is None:
raise ValueError("No segments found")
segment_url = slot.segments[index]["url"]
path = video_path / f".{index:07d}.ts"
generator.append(
functools.partial(_download_and_extract_video_segment, segment_url, api_key, path, manifest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes too long right now. poetry run python -m darwin.cli dataset pull --video-frames 23.98s user 6.25s system 201% cpu 14.995 total 24 seconds for 76 frames on m2 CPU.

What if we have few thousand frames at least? Maybe at least show ETA with proper progress bar?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retested. For identical videos, "old" video download is 1 second and new way of download is 16 seconds on 39 frame video. It should not be that bad.

)
else:
for i, frame_url in enumerate(slot.frame_urls):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))
else:
if len(slot.source_files) > 0:
image = slot.source_files[0]
Expand Down Expand Up @@ -525,3 +555,128 @@ def _rg16_to_grayscale(path):

new_image = Image.fromarray(np.uint8(image_2d_gray), mode="L")
new_image.save(path)


def _download_and_extract_video_segment(url: str, api_key: str, path: Path, manifest: dt.SegmentManifest) -> None:
_download_video_segment_file(url, api_key, path)
_extract_frames_from_segment(path, manifest)
path.unlink()


def _extract_frames_from_segment(path: Path, manifest: dt.SegmentManifest) -> None:
# import cv2 here to avoid dependency on OpenCV when not needed if not installed as optional extra
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've surprised myself, because I like this being in function scope, so that it raises on function call, not file call. I expected I would feel like it should be in header, but I like this.

import cv2 # pylint: disable=import-outside-toplevel
except ImportError as e:
raise MissingDependency(
"Missing Dependency: OpenCV required for Video Extraction. Install with `pip install darwin\[ocv]`"
) from e
cap = cv2.VideoCapture(str(path))

# Read and save frames. Iterates over every frame because frame seeking in OCV is not reliable or guaranteed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can only ever achieve half frame rate, essentially? Why is all video software rubbish? (I know the answer, but still!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out that get frame count wasn't guaranteed either, had to do a while loop on the read function instead

frames_to_extract = dict([(item.frame, item.visible_frame) for item in manifest.items if item.visibility])
frame_index = 0
while cap.isOpened():
success, frame = cap.read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame that OpenCV haven't done this a content manager. I may put in a PR for one.

if frame is None:
break
if not success:
raise ValueError(f"Failed to read frame {frame_index} from video segment {path}")
if frame_index in frames_to_extract:
visible_frame = frames_to_extract.pop(frame_index)
frame_path = path.parent / f"{visible_frame:07d}.png"
cv2.imwrite(str(frame_path), frame)
if not frames_to_extract:
break
frame_index += 1
cap.release()


def _download_video_segment_file(url: str, api_key: str, path: Path) -> None:
with requests.Session() as session:
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries=retries))
if "token" in url:
response = session.get(url)
else:
session.headers = {"Authorization": f"ApiKey {api_key}"}
response = session.get(url)
if not response.ok or (400 <= response.status_code <= 499):
raise Exception(
f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}."
)
# create new filename for segment with .
with open(str(path), "wb") as file:
for chunk in response:
file.write(chunk)


def download_manifest_txts(urls: List[str], api_key: str, folder: Path) -> List[Path]:
paths = []
with requests.Session() as session:
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries=retries))
for index, url in enumerate(urls):
if "token" in url:
response = session.get(url)
else:
session.headers = {"Authorization": f"ApiKey {api_key}"}
response = session.get(url)
if not response.ok or (400 <= response.status_code <= 499):
raise Exception(
f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}."
)
if not response.content:
raise Exception(f"Manifest file ({url}) is empty.")
path = folder / f"manifest_{index + 1}.txt"
with open(str(path), "wb") as file:
file.write(response.content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like the bubbled exceptions from these might be a bit non-specific if writes fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true, we can work that out in post though

paths.append(path)
return paths


def get_segment_manifests(slot: dt.Slot, parent_path: Path, api_key: str) -> List[dt.SegmentManifest]:
with TemporaryDirectory(dir=parent_path) as tmpdirname:
tmpdir = Path(tmpdirname)
if slot.frame_manifest is None:
raise ValueError("No frame manifest found")
frame_urls = [item["url"] for item in slot.frame_manifest]
manifest_paths = download_manifest_txts(frame_urls, api_key, tmpdir)
segment_manifests = _parse_manifests(manifest_paths, slot.name or "0")
return segment_manifests


def _parse_manifests(paths: List[Path], slot: str) -> List[dt.SegmentManifest]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is the hardest to read, but I don't think I'd change it, because the obstacle is really domain knowledge I think.

all_manifests: Dict[int, List[dt.ManifestItem]] = {}
visible_frame_index = 0
for path in paths:
with open(path) as infile:
for line in infile:
frame, segment_str, visibility, timestamp = line.strip("\n").split(":")
segment_int = int(segment_str)
if segment_int not in all_manifests:
all_manifests[segment_int] = []
if bool(int(visibility)):
all_manifests[segment_int].append(
dt.ManifestItem(int(frame), None, segment_int, True, float(timestamp), visible_frame_index)
)
visible_frame_index += 1
else:
all_manifests[segment_int].append(
dt.ManifestItem(int(frame), None, segment_int, False, float(timestamp), None)
)
# Create a list of segments, sorted by segment number and all items sorted by frame number
segments = []
for segment_int, seg_manifests in all_manifests.items():
seg_manifests.sort(key=lambda x: x.frame)
segments.append(
dt.SegmentManifest(slot=slot, segment=segment_int, total_frames=len(seg_manifests), items=seg_manifests)
)

# Calculate the absolute frame number for each item, as manifests are per segment
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly required for this approach as we do it per segment anyway, but if we do ever want to try a full file download approach then it's mapped back from the manifest

absolute_frame = 0
for segment in segments:
for item in segment.items:
item.absolute_frame = absolute_frame
absolute_frame += 1
return segments
10 changes: 9 additions & 1 deletion darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
make_class_lists,
)
from darwin.datatypes import AnnotationClass, AnnotationFile, ItemId, PathLike, Team
from darwin.exceptions import NotFound, UnsupportedExportFormat
from darwin.exceptions import MissingDependency, NotFound, UnsupportedExportFormat
from darwin.exporter.formats.darwin import build_image_annotation
from darwin.item import DatasetItem
from darwin.item_sorter import ItemSorter
Expand Down Expand Up @@ -277,6 +277,14 @@ def pull(
if annotation is None:
continue

if video_frames and any([not slot.frame_urls for slot in annotation.slots]):
# will raise if not installed via pip install darwin[ocv]
try:
import cv2 # pylint: disable=import-outside-toplevel
except ImportError as e:
raise MissingDependency(
"Missing Dependency: OpenCV required for Video Extraction. Install with `pip install darwin\[ocv]`"
) from e
filename = Path(annotation.filename).stem
if filename in stems:
stems[filename] += 1
Expand Down
24 changes: 24 additions & 0 deletions darwin/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ class Slot:
#: Metadata of the slot
metadata: Optional[Dict[str, UnknownType]] = None

#: Frame Manifest for video slots
frame_manifest: Optional[List[Dict[str, UnknownType]]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely optional change, but Dict[str, UnknownType] is interchangable with JSONType


#: Segments for video slots
segments: Optional[List[Dict[str, UnknownType]]] = None


@dataclass
class AnnotationFileVersion:
Expand Down Expand Up @@ -1155,3 +1161,21 @@ def validate(self) -> None:
raise ValueError("RasterLayer must be associated with at least one slot")
if not self.total_pixels and not self.total_pixels > 0:
raise ValueError("RasterLayer total_pixels cannot be empty")


@dataclass
class ManifestItem:
frame: int
absolute_frame: Optional[int]
segment: int
visibility: bool
timestamp: float
visible_frame: Optional[int]


@dataclass
class SegmentManifest:
slot: str
segment: int
total_frames: int
items: List[ManifestItem]
6 changes: 6 additions & 0 deletions darwin/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ class IncompatibleOptions(Exception):
"""


class MissingDependency(Exception):
"""
Used for when one of the optional install dependencies are missing
"""


class UnrecognizableFileEncoding(Exception):
"""
Used when a we try to decode a file and all decoding algorithms fail.
Expand Down
2 changes: 2 additions & 0 deletions darwin/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ def _parse_darwin_slot(data: Dict[str, Any]) -> dt.Slot:
frame_urls=data.get("frame_urls"),
fps=data.get("fps"),
metadata=data.get("metadata"),
segments=data.get("segments", []),
frame_manifest=data.get("frame_manifests"),
)


Expand Down
Loading
Loading