Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IO-1444] LongVideo Support #642

Merged
merged 17 commits into from
Aug 17, 2023
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,5 @@ scripts/

.ruff_cache/

!darwin/future/tests/data_objects/workflow/data
!darwin/future/tests/data_objects/workflow/data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always like a data based test

!tests/darwin/dataset/data
167 changes: 156 additions & 11 deletions darwin/dataset/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import time
import urllib
from pathlib import Path
from typing import Any, Callable, Iterable, List, Optional, Tuple
from shutil import rmtree
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple

import deprecation
import numpy as np
import orjson as json
import requests
from PIL import Image
from requests.adapters import HTTPAdapter, Retry
from rich.console import Console

import darwin.datatypes as dt
Expand Down Expand Up @@ -233,13 +235,12 @@ def lazy_download_image_from_annotation(
If the format of the annotation is not supported.
"""

console = Console()

if annotation_format == "json":
return _download_image_from_json_annotation(
api_key, annotation_path, images_path, use_folders, video_frames, force_slots, ignore_slots
)
else:
console = Console()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but I feel like we need a Logging style console getter/factory to maintain the console(s) in use.

console.print("[bold red]Unsupported file format. Please use 'json'.")
raise NotImplementedError

Expand Down Expand Up @@ -278,18 +279,33 @@ def _download_image_from_json_annotation(
return []


def _download_all_slots_from_json_annotation(annotation, api_key, parent_path, video_frames):
def _download_all_slots_from_json_annotation(
annotation: dt.AnnotationFile, api_key: str, parent_path: Path, video_frames: bool
) -> Iterable[Callable[[], None]]:
generator = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code, but this feels like a name we should change, given that it's a list, and really not even acting as a generator. At best it could feed a generator's output.

Anyway, I digress slightly...

for slot in annotation.slots:
if not slot.name:
raise ValueError("Slot name is required to download all slots")
slot_path = parent_path / sanitize_filename(annotation.filename) / sanitize_filename(slot.name)
slot_path.mkdir(exist_ok=True, parents=True)

if video_frames and slot.type != "image":
video_path: Path = slot_path / "sections"
video_path.mkdir(exist_ok=True, parents=True)
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))
if not slot.frame_urls:
segment_manifests = get_segment_manifests(slot, slot_path, api_key)
for index, manifest in enumerate(segment_manifests):
if slot.segments is None:
raise ValueError("No segments found")
segment_url = slot.segments[index]["url"]
path = video_path / f".{index:07d}.ts"
generator.append(
functools.partial(_download_and_extract_video_segment, segment_url, api_key, path, manifest)
)
else:
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this takes a while?

else:
for upload in slot.source_files:
file_path = slot_path / sanitize_filename(upload["file_name"])
Expand All @@ -306,16 +322,29 @@ def _download_single_slot_from_json_annotation(
annotation_path: Path,
video_frames: bool,
use_folders: bool = False,
):
) -> Iterable[Callable[[], None]]:
slot = annotation.slots[0]
generator = []

if video_frames and slot.type != "image":
video_path: Path = parent_path / annotation_path.stem
video_path.mkdir(exist_ok=True, parents=True)
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))

# Indicates it's a long video and uses the segment and manifest
if not slot.frame_urls:
segment_manifests = get_segment_manifests(slot, video_path, api_key)
for index, manifest in enumerate(segment_manifests):
if slot.segments is None:
raise ValueError("No segments found")
segment_url = slot.segments[index]["url"]
path = video_path / f".{index:07d}.ts"
generator.append(
functools.partial(_download_and_extract_video_segment, segment_url, api_key, path, manifest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes too long right now. poetry run python -m darwin.cli dataset pull --video-frames 23.98s user 6.25s system 201% cpu 14.995 total 24 seconds for 76 frames on m2 CPU.

What if we have few thousand frames at least? Maybe at least show ETA with proper progress bar?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retested. For identical videos, "old" video download is 1 second and new way of download is 16 seconds on 39 frame video. It should not be that bad.

)
else:
for i, frame_url in enumerate(slot.frame_urls):
path = video_path / f"{i:07d}.png"
generator.append(functools.partial(_download_image, frame_url, path, api_key, slot))
else:
if len(slot.source_files) > 0:
image = slot.source_files[0]
Expand Down Expand Up @@ -525,3 +554,119 @@ def _rg16_to_grayscale(path):

new_image = Image.fromarray(np.uint8(image_2d_gray), mode="L")
new_image.save(path)


def _download_and_extract_video_segment(url: str, api_key: str, path: Path, manifest: dt.SegmentManifest) -> None:
_download_video_segment_file(url, api_key, path)
_extract_frames_from_segment(path, manifest)
path.unlink()


def _extract_frames_from_segment(path: Path, manifest: dt.SegmentManifest) -> None:
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've surprised myself, because I like this being in function scope, so that it raises on function call, not file call. I expected I would feel like it should be in header, but I like this.

import cv2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency needs to be added to README with some explanation about . Right now, I managed to install it only by upping python version requirement and hardcoding numpy version.

Also,

$ pip install "darwin[ocv]"
ERROR: Could not find a version that satisfies the requirement darwin[ocv] (from versions: none)
ERROR: No matching distribution found for darwin[ocv]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the pip install message is only relevant after this PR gets deployed as it needs to be packaged for pip

except ImportError:
raise Exception("OpenCV is required to extract video frames. Please install with pip install darwin\[ocv]")
cap = cv2.VideoCapture(str(path))

# Read and save frames. Iterates over every frame because frame seeking in OCV is not reliable or guaranteed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can only ever achieve half frame rate, essentially? Why is all video software rubbish? (I know the answer, but still!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out that get frame count wasn't guaranteed either, had to do a while loop on the read function instead

frames_to_extract = [item.frame for item in manifest.items if item.visibility]
frame_index = 0
while cap.isOpened():
success, frame = cap.read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame that OpenCV haven't done this a content manager. I may put in a PR for one.

if frame is None:
break
if not success:
raise Exception(f"Failed to read frame {frame_index} from video segment {path}")
if frame_index in frames_to_extract:
frames_to_extract.remove(frame_index)
frame_path = path.parent / f"{frame_index:07d}.png"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth noting that this way we persist original frame numbers in filenames, while our existing export downloads are sequential (0-1-2-3-4, not 5-10-15-20).

cv2.imwrite(str(frame_path), frame)
if not frames_to_extract:
break
frame_index += 1
cap.release()


def _download_video_segment_file(url: str, api_key: str, path: Path) -> None:
with requests.Session() as session:
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries=retries))
if "token" in url:
response = session.get(url)
else:
session.headers = {"Authorization": f"ApiKey {api_key}"}
response = session.get(url)
if not response.ok or (400 <= response.status_code <= 499):
raise Exception(
f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}."
)
# create new filename for segment with .
with open(str(path), "wb") as file:
for chunk in response:
file.write(chunk)


def download_manifest_txts(urls: List[str], api_key: str, folder: Path) -> List[Path]:
paths = []
with requests.Session() as session:
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries=retries))
for index, url in enumerate(urls):
if "token" in url:
response = session.get(url)
else:
session.headers = {"Authorization": f"ApiKey {api_key}"}
response = session.get(url)
if not response.ok or (400 <= response.status_code <= 499):
raise Exception(
f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}."
)
if not response.content:
raise Exception(f"Manifest file ({url}) is empty.")
path = folder / f"manifest_{index + 1}.txt"
with open(str(path), "wb") as file:
file.write(response.content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like the bubbled exceptions from these might be a bit non-specific if writes fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true, we can work that out in post though

paths.append(path)
return paths


def get_segment_manifests(slot: dt.Slot, parent_path: Path, api_key: str) -> List[dt.SegmentManifest]:
temp_dir = parent_path / "temp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you avoid TemporaryDirectory for xplat reasons?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just always hesitant with temps and supporting other platforms

temp_dir.mkdir(exist_ok=True, parents=True)
if slot.frame_manifest is None:
raise ValueError("No frame manifest found")
frame_urls = [item["url"] for item in slot.frame_manifest]
manifest_paths = download_manifest_txts(frame_urls, api_key, temp_dir)
segment_manifests = _parse_manifests(manifest_paths, slot.name or "0")
rmtree(temp_dir)
Nathanjp91 marked this conversation as resolved.
Show resolved Hide resolved
return segment_manifests


def _parse_manifests(paths: List[Path], slot: str) -> List[dt.SegmentManifest]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is the hardest to read, but I don't think I'd change it, because the obstacle is really domain knowledge I think.

all_manifests: Dict[int, List[dt.ManifestItem]] = {}
for path in paths:
with open(path) as infile:
for line in infile:
frame, segment_str, visibility, timestamp = line.strip("\n").split(":")
segment_int = int(segment_str)
if segment_int not in all_manifests:
all_manifests[segment_int] = []
all_manifests[segment_int].append(
dt.ManifestItem(int(frame), None, segment_int, bool(int(visibility)), float(timestamp))
)
# Create a list of segments, sorted by segment number and all items sorted by frame number
segments = []
for segment_int, seg_manifests in all_manifests.items():
seg_manifests.sort(key=lambda x: x.frame)
segments.append(
dt.SegmentManifest(slot=slot, segment=segment_int, total_frames=len(seg_manifests), items=seg_manifests)
)

# Calculate the absolute frame number for each item, as manifests are per segment
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly required for this approach as we do it per segment anyway, but if we do ever want to try a full file download approach then it's mapped back from the manifest

absolute_frame = 0
for segment in segments:
for item in segment.items:
item.absolute_frame = absolute_frame
absolute_frame += 1
return segments
23 changes: 23 additions & 0 deletions darwin/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ class Slot:
#: Metadata of the slot
metadata: Optional[Dict[str, UnknownType]] = None

#: Frame Manifest for video slots
frame_manifest: Optional[List[Dict[str, UnknownType]]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely optional change, but Dict[str, UnknownType] is interchangable with JSONType


#: Segments for video slots
segments: Optional[List[Dict[str, UnknownType]]] = None


@dataclass
class AnnotationFileVersion:
Expand Down Expand Up @@ -1155,3 +1161,20 @@ def validate(self) -> None:
raise ValueError("RasterLayer must be associated with at least one slot")
if not self.total_pixels and not self.total_pixels > 0:
raise ValueError("RasterLayer total_pixels cannot be empty")


@dataclass
class ManifestItem:
frame: int
absolute_frame: Optional[int]
segment: int
visibility: bool
timestamp: float


@dataclass
class SegmentManifest:
slot: str
segment: int
total_frames: int
items: List[ManifestItem]
2 changes: 2 additions & 0 deletions darwin/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ def _parse_darwin_slot(data: Dict[str, Any]) -> dt.Slot:
frame_urls=data.get("frame_urls"),
fps=data.get("fps"),
metadata=data.get("metadata"),
segments=data.get("segments", []),
frame_manifest=data.get("frame_manifests"),
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
0:0:0:0
1:0:1:1.0
0:1:0:2.0
1:1:1:3.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
0:2:0:0
1:2:1:1.0
0:3:0:2.0
1:3:1:3.0
69 changes: 68 additions & 1 deletion tests/darwin/dataset/download_manager_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,80 @@
from pathlib import Path
from typing import List
from unittest.mock import MagicMock, patch

import pytest
import responses
from requests import get

from darwin.client import Client
from darwin.config import Config
from darwin.dataset import RemoteDataset
from darwin.dataset import download_manager as dm
from darwin.dataset.identifier import DatasetIdentifier
from darwin.dataset.remote_dataset_v1 import RemoteDatasetV1
from darwin.datatypes import Slot
from tests.fixtures import *


@pytest.fixture
def manifest_paths() -> List[Path]:
return [
Path("tests/darwin/dataset/data/manifest_examples/manifest_1.txt.test"),
Path("tests/darwin/dataset/data/manifest_examples/manifest_2.txt.test"),
]


@pytest.fixture
def slot_w_manifests() -> Slot:
return Slot(
name="test_slot",
type="video",
source_files=[],
frame_manifest=[{"url": "http://test.com"}, {"url": "http://test2.com"}],
)


def test_parse_manifests(manifest_paths: List[Path]) -> None:
segment_manifests = dm._parse_manifests(manifest_paths, "0")
assert len(segment_manifests) == 4
Copy link
Contributor Author

@Nathanjp91 Nathanjp91 Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These long assert chains brought to you by copilot

assert len(segment_manifests[0].items) == 2
assert len(segment_manifests[1].items) == 2
assert len(segment_manifests[2].items) == 2
assert len(segment_manifests[3].items) == 2
assert segment_manifests[0].items[0].absolute_frame == 0
assert segment_manifests[0].items[1].absolute_frame == 1
assert segment_manifests[0].items[1].visibility == True
assert segment_manifests[1].items[0].absolute_frame == 2
assert segment_manifests[1].items[1].absolute_frame == 3
assert segment_manifests[1].items[1].visibility == True
assert segment_manifests[2].items[0].absolute_frame == 4
assert segment_manifests[2].items[1].absolute_frame == 5
assert segment_manifests[2].items[1].visibility == True
assert segment_manifests[3].items[0].absolute_frame == 6
assert segment_manifests[3].items[1].absolute_frame == 7
assert segment_manifests[3].items[1].visibility == True


def test_get_segment_manifests(manifest_paths: List[Path], slot_w_manifests: Slot) -> None:
parent_path = Path("tests/darwin/dataset/data/manifest_examples")
files = [open(path, "r").read() for path in manifest_paths]
with responses.RequestsMock() as rsps:
rsps.add(responses.GET, "http://test.com", body=files[0])
rsps.add(responses.GET, "http://test2.com", body=files[1])
segment_manifests = dm.get_segment_manifests(slot_w_manifests, parent_path, "")
assert len(segment_manifests) == 4
assert len(segment_manifests[0].items) == 2
assert len(segment_manifests[1].items) == 2
assert len(segment_manifests[2].items) == 2
assert len(segment_manifests[3].items) == 2
assert segment_manifests[0].items[0].absolute_frame == 0
assert segment_manifests[0].items[1].absolute_frame == 1
assert segment_manifests[0].items[1].visibility == True
assert segment_manifests[1].items[0].absolute_frame == 2
assert segment_manifests[1].items[1].absolute_frame == 3
assert segment_manifests[1].items[1].visibility == True
assert segment_manifests[2].items[0].absolute_frame == 4
assert segment_manifests[2].items[1].absolute_frame == 5
assert segment_manifests[2].items[1].visibility == True
assert segment_manifests[3].items[0].absolute_frame == 6
assert segment_manifests[3].items[1].absolute_frame == 7
assert segment_manifests[3].items[1].visibility == True
Loading