Skip to content

Commit

Permalink
feat: enable upload history and duplication check (#497)
Browse files Browse the repository at this point in the history
* feat: enable upload history and duplication check

* write cluster_id in history

* fix types

* calculate checksum from original file md5sums

* fix tests

* use appdir for upload history

* add types back

* fix tests

* use del os.environ to unset envvars
  • Loading branch information
ptpt committed Jul 15, 2022
1 parent 44609af commit ee40cfa
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 109 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,11 @@ mapillary_tools upload_zip "path/to/zipped_images/"
class Uploader:
def __init__(self, user_items: UserItem, emitter: EventEmitter = None, dry_run=False): ...

def upload_zipfile(self, zip_path: str) -> Optional[int]: ...
def upload_zipfile(self, zip_path: str) -> Optional[str]: ...

def upload_blackvue(self, blackvue_path: str) -> Optional[int]: ...
def upload_blackvue(self, blackvue_path: str) -> Optional[str]: ...

def upload_images(self, descs: List[ImageDescriptionFile]) -> Dict[str, int]: ...
def upload_images(self, descs: List[ImageDescriptionFile]) -> Dict[str, str]: ...
```

#### Examples
Expand Down
4 changes: 4 additions & 0 deletions mapillary_tools/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

import appdirs

_ENV_PREFIX = "MAPILLARY_TOOLS_"

ANSI_BOLD = "\033[1m"
Expand All @@ -16,3 +18,5 @@
SAMPLED_VIDEO_FRAMES_FILENAME = os.getenv(
_ENV_PREFIX + "SAMPLED_VIDEO_FRAMES_FILENAME", "mapillary_sampled_video_frames"
)

USER_DATA_DIR = appdirs.user_data_dir(appname="mapillary_tools", appauthor="Mapillary")
24 changes: 17 additions & 7 deletions mapillary_tools/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@

LOG = logging.getLogger(__name__)
MAPILLARY_DISABLE_API_LOGGING = os.getenv("MAPILLARY_DISABLE_API_LOGGING")
MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN = os.getenv(
"MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN"
)
# Disable if it's set to empty
MAPILLARY_UPLOAD_HISTORY_PATH = os.getenv(
"MAPILLARY_UPLOAD_HISTORY_PATH",
# To enable it by default
# os.path.join(config.DEFAULT_MAPILLARY_FOLDER, "upload_history"),
os.path.join(
constants.USER_DATA_DIR,
"upload_history",
),
)


Expand Down Expand Up @@ -143,7 +149,7 @@ def _history_desc_path(md5sum: str) -> str:


def is_uploaded(md5sum: str) -> bool:
if MAPILLARY_UPLOAD_HISTORY_PATH is None:
if not MAPILLARY_UPLOAD_HISTORY_PATH:
return False
return os.path.isfile(_history_desc_path(md5sum))

Expand All @@ -154,12 +160,12 @@ def write_history(
summary: T.Dict,
descs: T.Optional[T.List[types.ImageDescriptionFile]] = None,
) -> None:
if MAPILLARY_UPLOAD_HISTORY_PATH is None:
if not MAPILLARY_UPLOAD_HISTORY_PATH:
return
path = _history_desc_path(md5sum)
LOG.debug(f"Writing upload history: {path}")
os.makedirs(os.path.dirname(path), exist_ok=True)
history: T.Dict = {
history: T.Dict[str, T.Any] = {
"params": params,
"summary": summary,
}
Expand Down Expand Up @@ -577,8 +583,12 @@ def upload(

# Setup the emitter -- the order matters here

enable_history = MAPILLARY_UPLOAD_HISTORY_PATH and (
not dry_run or MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN == "YES"
)

# Put it first one to cancel early
if not dry_run:
if enable_history:
_setup_cancel_due_to_duplication(emitter)

# This one set up tdqm
Expand Down Expand Up @@ -613,7 +623,7 @@ def upload(
else:
descs = []

if not dry_run:
if enable_history:
_setup_write_upload_history(emitter, params, descs)

mly_uploader = uploader.Uploader(user_items, emitter=emitter, dry_run=dry_run)
Expand Down
8 changes: 4 additions & 4 deletions mapillary_tools/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def upload(
f"Upload server error: File handle not found in the upload response {resp.text}"
)

def finish(self, file_handle: str) -> int:
def finish(self, file_handle: str) -> str:
headers = {
"Authorization": f"OAuth {self.user_access_token}",
}
Expand Down Expand Up @@ -173,7 +173,7 @@ def finish(self, file_handle: str) -> int:
f"Upload server error: failed to create the cluster {resp.text}"
)

return T.cast(int, cluster_id)
return T.cast(str, cluster_id)


import random
Expand Down Expand Up @@ -219,8 +219,8 @@ def upload(
callback(chunk, None)
return self.session_key

def finish(self, _: str) -> int:
return 0
def finish(self, _: str) -> str:
return "0"

def fetch_offset(self) -> int:
if random.random() <= self._error_ratio:
Expand Down
58 changes: 38 additions & 20 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class Progress(types.TypedDict, total=False):
# Path to the Zipfile/BlackVue
import_path: str

# Cluster ID after finishing the upload
cluster_id: str


class UploadCancelled(Exception):
pass
Expand Down Expand Up @@ -112,12 +115,13 @@ def __init__(

def upload_zipfile(
self, zip_path: str, event_payload: T.Optional[Progress] = None
) -> T.Optional[int]:
) -> T.Optional[str]:
with zipfile.ZipFile(zip_path) as ziph:
namelist = ziph.namelist()

if not namelist:
raise RuntimeError(f"The zip file {zip_path} is empty")
if not namelist:
LOG.warning(f"Skipping empty zipfile: %s", zip_path)
return None
upload_md5sum = _hash_zipfile(ziph)

if event_payload is None:
event_payload = {}
Expand All @@ -131,6 +135,7 @@ def upload_zipfile(
try:
return _upload_zipfile_fp(
fp,
upload_md5sum,
len(namelist),
self.user_items,
event_payload=T.cast(
Expand All @@ -144,7 +149,7 @@ def upload_zipfile(

def upload_blackvue(
self, blackvue_path: str, event_payload: T.Optional[Progress] = None
) -> T.Optional[int]:
) -> T.Optional[str]:
try:
return upload_blackvue(
blackvue_path,
Expand All @@ -158,10 +163,10 @@ def upload_blackvue(

def upload_images(
self, descs: T.List[types.ImageDescriptionFile]
) -> T.Dict[str, int]:
) -> T.Dict[str, str]:
_validate_descs(descs)
sequences = _group_sequences_by_uuid(descs)
ret: T.Dict[str, int] = {}
ret: T.Dict[str, str] = {}
for sequence_idx, (sequence_uuid, images) in enumerate(sequences.items()):
event_payload: Progress = {
"sequence_idx": sequence_idx,
Expand All @@ -170,10 +175,11 @@ def upload_images(
"sequence_uuid": sequence_uuid,
}
with tempfile.NamedTemporaryFile() as fp:
_zip_sequence_fp(images, fp)
upload_md5sum = _zip_sequence_fp(images, fp)
try:
cluster_id: T.Optional[int] = _upload_zipfile_fp(
cluster_id: T.Optional[str] = _upload_zipfile_fp(
fp,
upload_md5sum,
len(images),
self.user_items,
emitter=self.emitter,
Expand Down Expand Up @@ -218,16 +224,25 @@ def zip_images(
zip_dir, f"mly_tools_{sequence_uuid}.{os.getpid()}.wip"
)
with open(zip_filename_wip, "wb") as fp:
_zip_sequence_fp(sequence, fp)
upload_md5sum = utils.file_md5sum(zip_filename_wip)
upload_md5sum = _zip_sequence_fp(sequence, fp)
zip_filename = os.path.join(zip_dir, f"mly_tools_{upload_md5sum}.zip")
os.rename(zip_filename_wip, zip_filename)


# Instead of hashing the zip file content, we hash the filename list,
# because the zip content could be changed due to EXIF change
# (e.g. changes in MAPMetaTag in image description)
def _hash_zipfile(ziph: zipfile.ZipFile) -> str:
# namelist is List[str]
namelist = ziph.namelist()
concat = "".join(os.path.splitext(os.path.basename(name))[0] for name in namelist)
return utils.md5sum_bytes(concat.encode("utf-8"))


def _zip_sequence_fp(
sequence: T.Dict[str, types.ImageDescriptionFile],
fp: T.IO[bytes],
) -> None:
) -> str:
descs = list(sequence.values())
descs.sort(
key=lambda desc: (
Expand All @@ -238,34 +253,36 @@ def _zip_sequence_fp(
with zipfile.ZipFile(fp, "w", zipfile.ZIP_DEFLATED) as ziph:
for desc in descs:
edit = exif_write.ExifEdit(desc["filename"])
with open(desc["filename"], "rb") as fp:
md5sum = utils.md5sum_fp(fp)
exif_desc = desc_file_to_exif(desc)
edit.add_image_description(exif_desc)
image_bytes = edit.dump_image_bytes()
# To make sure the zip file deterministic, i.e. zip same files result in same content (same hashes),
# we use md5 as the name, and an constant as the modification time
_, ext = os.path.split(desc["filename"])
arcname = f"{utils.md5sum_bytes(image_bytes)}{ext.lower()}"
_, ext = os.path.splitext(desc["filename"])
arcname = f"{md5sum}{ext.lower()}"
zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0))
ziph.writestr(zipinfo, image_bytes)

return _hash_zipfile(ziph)


def _upload_zipfile_fp(
fp: T.IO[bytes],
upload_md5sum: str,
image_count: int,
user_items: types.UserItem,
event_payload: T.Optional[Progress] = None,
emitter: T.Optional[EventEmitter] = None,
dry_run=False,
) -> int:
) -> str:
if event_payload is None:
event_payload = {
"sequence_idx": 0,
"total_sequence_count": 1,
}

fp.seek(0, io.SEEK_SET)
upload_md5sum = utils.md5sum_fp(fp)

fp.seek(0, io.SEEK_END)
entity_size = fp.tell()

Expand Down Expand Up @@ -308,7 +325,7 @@ def upload_blackvue(
event_payload: T.Optional[Progress] = None,
emitter: EventEmitter = None,
dry_run=False,
) -> int:
) -> str:
jsonschema.validate(instance=user_items, schema=types.UserItemSchema)

with open(blackvue_path, "rb") as fp:
Expand Down Expand Up @@ -391,7 +408,7 @@ def _upload_fp(
chunk_size: int,
event_payload: Progress = None,
emitter: EventEmitter = None,
) -> int:
) -> str:
retries = 0

if event_payload is None:
Expand Down Expand Up @@ -462,6 +479,7 @@ def _reset_retries(_, __):
raise upload_api_v4.wrap_http_exception(ex) from ex

if emitter:
mutable_payload["cluster_id"] = cluster_id
emitter.emit("upload_finished", mutable_payload)

return cluster_id
3 changes: 0 additions & 3 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ ignore_missing_imports = True
[mypy-pynmea2.*]
ignore_missing_imports = True

[mypy-tzwhere.*]
ignore_missing_imports = True

[mypy-gpxpy.*]
ignore_missing_imports = True

Expand Down
7 changes: 3 additions & 4 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
Pillow>=9.0.0,<10.0.0; python_version >= '3.7'
Pillow>=8.0.0,<9.0.0; python_version < '3.7'
Pillow>=9.0.0,<10.0.0; python_version>='3.7'
Pillow>=8.0.0,<9.0.0; python_version<'3.7'
pytest
black
mypy
pyinstaller
flake8
types-python-dateutil
types-pytz
types-requests
types-appdirs
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
appdirs>=1.4.4,<2.0.0
exifread==2.3.2
piexif==1.1.3
gpxpy==1.4.2
Expand Down
13 changes: 4 additions & 9 deletions tests/integration/test_gopro.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import py.path

from .test_process import setup_config, EXECUTABLE, is_ffmpeg_installed
from .test_process import setup_config, setup_upload, EXECUTABLE, is_ffmpeg_installed


IMPORT_PATH = "tests/integration/mapillary_tools_process_images_provider/gopro_data"
Expand Down Expand Up @@ -91,13 +91,12 @@ def setup_data(tmpdir: py.path.local):


def test_process_gopro_hero8(
tmpdir: py.path.local, setup_data: py.path.local, setup_config: py.path.local
setup_data: py.path.local,
setup_config: py.path.local,
setup_upload: py.path.local,
):
if not is_ffmpeg_installed:
pytest.skip("skip because ffmpeg not installed")
os.environ["MAPILLARY_CONFIG_PATH"] = str(setup_config)
upload_dir = tmpdir.mkdir("mapillary_public_uploads")
os.environ["MAPILLARY_UPLOAD_PATH"] = str(upload_dir)
video_path = setup_data.join("hero8.mp4")
x = subprocess.run(
f"{EXECUTABLE} video_process --geotag_source=gopro_videos {str(video_path)} --interpolation_use_gpx_start_time",
Expand All @@ -109,7 +108,6 @@ def test_process_gopro_hero8(
)
assert desc_path.exists()
with open(desc_path) as fp:
# print(fp.read())
descs = json.load(fp)
for expected, actual in zip(expected_descs, descs):
assert abs(expected["MAPLatitude"] - actual["MAPLatitude"]) < 0.0001
Expand All @@ -131,7 +129,4 @@ def test_process_gopro_hero8(
< 0.0001
)
assert expected["filename"] == actual["filename"]
assert {
"strings": [{"key": "mapillary_tools_version", "value": "0.8.2"}]
} == actual["MAPMetaTags"]
assert "MAPSequenceUUID" in actual

0 comments on commit ee40cfa

Please sign in to comment.