From d767a6ba3fc2a9586098224245cf08ee97c7134f Mon Sep 17 00:00:00 2001 From: sk-zk Date: Mon, 29 Apr 2024 16:05:45 +0200 Subject: [PATCH 1/4] Update readme --- README.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 017fb60..3231cbb 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ # streetlevel -**streetlevel** is a library for downloading panoramas and metadata from street-level imagery services including Google Street View, Apple Look Around, and several others. +**streetlevel** is a library for downloading panoramas and metadata from street-level imagery services +such as Google Street View, Apple Look Around, and several others. It provides a high-level abstraction +over the internal APIs of the supported services – this means that no API keys are required, but the +library may break unexpectedly. -Since it relies on calls to internal APIs, it may break unexpectedly. - -(Nearly) all functions are available as both a sync function using `requests` or an async function using `aiohttp`, requiring a `ClientSession`. +(Nearly) all functions are available as either a sync function using `requests` or an async function +using `aiohttp`, requiring a `ClientSession`. ## Installation ```sh From 0320746f749aed0d48c137d99ad58a9201deabbf Mon Sep 17 00:00:00 2001 From: sk-zk Date: Mon, 29 Apr 2024 16:06:15 +0200 Subject: [PATCH 2/4] Refactor parsing code --- streetlevel/ja/ja.py | 93 ++------ streetlevel/ja/parse.py | 71 ++++++ streetlevel/kakao/kakao.py | 61 +---- streetlevel/kakao/parse.py | 51 +++++ streetlevel/lookaround/lookaround.py | 62 +---- streetlevel/lookaround/parse.py | 60 +++++ streetlevel/lookaround/reproject.py | 2 +- streetlevel/mapy/mapy.py | 134 +++-------- streetlevel/mapy/parse.py | 75 ++++++ streetlevel/naver/naver.py | 122 +--------- streetlevel/naver/parse.py | 105 +++++++++ streetlevel/streetside/parse.py | 46 ++++ streetlevel/streetside/streetside.py | 55 +---- streetlevel/streetview/parse.py | 269 ++++++++++++++++++++++ streetlevel/streetview/streetview.py | 326 ++------------------------- streetlevel/yandex/parse.py | 161 +++++++++++++ streetlevel/yandex/yandex.py | 194 ++-------------- tests/lookaround/lookaround_test.py | 19 +- tests/mapy/mapy_test.py | 13 +- tests/streetside/streetside_test.py | 13 +- tests/streetview/streetview_test.py | 102 +++------ tests/yandex/yandex_test.py | 16 +- 22 files changed, 1001 insertions(+), 1049 deletions(-) create mode 100644 streetlevel/ja/parse.py create mode 100644 streetlevel/kakao/parse.py create mode 100644 streetlevel/lookaround/parse.py create mode 100644 streetlevel/mapy/parse.py create mode 100644 streetlevel/naver/parse.py create mode 100644 streetlevel/streetside/parse.py create mode 100644 streetlevel/streetview/parse.py create mode 100644 streetlevel/yandex/parse.py diff --git a/streetlevel/ja/ja.py b/streetlevel/ja/ja.py index 8b08b9e..b232c1c 100644 --- a/streetlevel/ja/ja.py +++ b/streetlevel/ja/ja.py @@ -1,4 +1,3 @@ -import math from typing import Optional, List, Tuple, Union from PIL import Image @@ -6,10 +5,11 @@ from requests import Session from . import api -from .panorama import JaPanorama, CaptureDate, Address, StreetLabel +from .panorama import JaPanorama +from .parse import parse_panorama_radius_response, parse_panorama_id_response from ..dataclasses import Tile from ..util import download_tiles, download_tiles_async, CubemapStitchingMethod, stitch_cubemap_faces, \ - save_cubemap_panorama, stitch_cubemap_face, try_get + save_cubemap_panorama, stitch_cubemap_face def find_panorama(lat: float, lon: float, radius: int = 100, session: Session = None) -> Optional[JaPanorama]: @@ -23,21 +23,13 @@ def find_panorama(lat: float, lon: float, radius: int = 100, session: Session = :return: A JaPanorama if a panorama was found, or None. """ response = api.find_panorama(lat, lon, radius, session) - - if "message" in response: - return None - - return _parse_panorama(response) + return parse_panorama_radius_response(response) async def find_panorama_async(lat: float, lon: float, session: ClientSession, - radius: int = 100) -> Optional[JaPanorama]: + radius: int = 100) -> Optional[JaPanorama]: response = await api.find_panorama_async(lat, lon, session, radius) - - if "message" in response: - return None - - return _parse_panorama(response) + return parse_panorama_radius_response(response) def find_panorama_by_id(panoid: int, session: Session = None) -> Optional[JaPanorama]: @@ -49,12 +41,12 @@ def find_panorama_by_id(panoid: int, session: Session = None) -> Optional[JaPano :return: A JaPanorama object if a panorama with this ID was found, or None. """ response = api.find_panorama_by_id(panoid, session) - return _parse_panorama_by_id(response) + return parse_panorama_id_response(response) async def find_panorama_by_id_async(panoid: int, session: ClientSession) -> Optional[JaPanorama]: response = await api.find_panorama_by_id_async(panoid, session) - return _parse_panorama_by_id(response) + return parse_panorama_id_response(response) def get_panorama(pano: JaPanorama, zoom: int = 0, @@ -69,7 +61,7 @@ def get_panorama(pano: JaPanorama, zoom: int = 0, image. Defaults to ``ROW``. :return: A PIL image or a list of six PIL images depending on ``stitching_method``. """ - zoom = min(1, max(zoom, 0)) + zoom = _validate_get_panorama_params(pano, zoom) face_tiles, cols, rows = _generate_tile_list(pano, zoom) tile_images = _download_tiles(face_tiles) return _stitch_panorama(tile_images, cols, rows, stitching_method=stitching_method) @@ -78,7 +70,7 @@ def get_panorama(pano: JaPanorama, zoom: int = 0, async def get_panorama_async(pano: JaPanorama, session: ClientSession, zoom: int = 0, stitching_method: CubemapStitchingMethod = CubemapStitchingMethod.ROW) \ -> Union[List[Image.Image], Image.Image]: - zoom = min(1, max(zoom, 0)) + zoom = _validate_get_panorama_params(pano, zoom) face_tiles, cols, rows = _generate_tile_list(pano, zoom) tile_images = await _download_tiles_async(face_tiles, session) return _stitch_panorama(tile_images, cols, rows, stitching_method=stitching_method) @@ -116,66 +108,11 @@ async def download_panorama_async(pano: JaPanorama, path: str, session: ClientSe save_cubemap_panorama(output, path, pil_args) -def _parse_panorama_by_id(pano_dict: dict) -> JaPanorama: - address = try_get(lambda: pano_dict["streets"]["nearestAddress"]) - if address: - address = Address(*address.values()) - - return JaPanorama( - id=pano_dict["image"]["id"], - lat=pano_dict["image"]["lat"], - lon=pano_dict["image"]["lng"], - heading=math.radians(pano_dict["image"]["heading"]), - date=_parse_date(pano_dict["image"]["month"]), - pano_url="https:" + pano_dict["image"]["pano_url"], - blur_key=pano_dict["image"]["blur_key"], - street_names=_parse_streets(pano_dict["streets"]), - address=address, - neighbors=_parse_hotspots(pano_dict["hotspots"]), - ) - - -def _parse_streets(streets: dict) -> List[StreetLabel]: - main = StreetLabel(name=streets["street"]["name"], - angles=[math.radians(a) for a in streets["street"]["azimuths"]]) - connections = [] - for connection_dict in streets["connections"]: - connection = StreetLabel(name=connection_dict["name"], - angles=[math.radians(connection_dict["angle"])], - distance=connection_dict["distance"]) - connections.append(connection) - - return [main] + connections - - -def _parse_hotspots(hotspots: list) -> List[JaPanorama]: - neighbors = [] - for hotspot in hotspots: - neighbors.append(JaPanorama( - id=hotspot["image"]["id"], - lat=hotspot["image"]["lat"], - lon=hotspot["image"]["lng"], - heading=math.radians(hotspot["image"]["heading"]), - date=_parse_date(hotspot["image"]["month"]), - pano_url="https:" + hotspot["image"]["pano_url"], - blur_key=hotspot["image"]["blur_key"], - )) - return neighbors - - -def _parse_date(date_str: str) -> CaptureDate: - year, month = date_str.split("-") - date = CaptureDate(int(year), int(month)) - return date - - -def _parse_panorama(pano_dict: dict) -> JaPanorama: - return JaPanorama( - id=pano_dict["id"], - lat=pano_dict["lat"], - lon=pano_dict["lng"], - heading=math.radians(pano_dict["image_heading"]), - ) +def _validate_get_panorama_params(pano, zoom): + if not pano.pano_url: + raise ValueError("pano_url is None; please call find_panorama_by_id to fetch this info") + zoom = min(1, max(zoom, 0)) + return zoom def _generate_tile_list(pano: JaPanorama, zoom: int) -> Tuple[List[List[Tile]], int, int]: diff --git a/streetlevel/ja/parse.py b/streetlevel/ja/parse.py new file mode 100644 index 0000000..019b2ce --- /dev/null +++ b/streetlevel/ja/parse.py @@ -0,0 +1,71 @@ +import math +from typing import List, Optional + +from streetlevel.ja.panorama import JaPanorama, Address, StreetLabel, CaptureDate +from streetlevel.util import try_get + + +def parse_panorama_radius_response(pano_dict: dict) -> Optional[JaPanorama]: + if "message" in pano_dict: + return None + + return JaPanorama( + id=pano_dict["id"], + lat=pano_dict["lat"], + lon=pano_dict["lng"], + heading=math.radians(pano_dict["image_heading"]), + ) + + +def parse_panorama_id_response(pano_dict: dict) -> JaPanorama: + address = try_get(lambda: pano_dict["streets"]["nearestAddress"]) + if address: + address = Address(*address.values()) + + return JaPanorama( + id=pano_dict["image"]["id"], + lat=pano_dict["image"]["lat"], + lon=pano_dict["image"]["lng"], + heading=math.radians(pano_dict["image"]["heading"]), + date=_parse_date(pano_dict["image"]["month"]), + pano_url="https:" + pano_dict["image"]["pano_url"], + blur_key=pano_dict["image"]["blur_key"], + street_names=_parse_streets(pano_dict["streets"]), + address=address, + neighbors=_parse_hotspots(pano_dict["hotspots"]), + ) + + +def _parse_streets(streets: dict) -> List[StreetLabel]: + main = StreetLabel(name=streets["street"]["name"], + angles=[math.radians(a) for a in streets["street"]["azimuths"]]) + connections = [] + for connection_dict in streets["connections"]: + connection = StreetLabel(name=connection_dict["name"], + angles=[math.radians(connection_dict["angle"])], + distance=connection_dict["distance"]) + connections.append(connection) + + return [main] + connections + + +def _parse_hotspots(hotspots: list) -> List[JaPanorama]: + neighbors = [] + for hotspot in hotspots: + neighbors.append(JaPanorama( + id=hotspot["image"]["id"], + lat=hotspot["image"]["lat"], + lon=hotspot["image"]["lng"], + heading=math.radians(hotspot["image"]["heading"]), + date=_parse_date(hotspot["image"]["month"]), + pano_url="https:" + hotspot["image"]["pano_url"], + blur_key=hotspot["image"]["blur_key"], + )) + return neighbors + + +def _parse_date(date_str: str) -> CaptureDate: + year, month = date_str.split("-") + date = CaptureDate(int(year), int(month)) + return date + diff --git a/streetlevel/kakao/kakao.py b/streetlevel/kakao/kakao.py index cd7ef74..36e6180 100644 --- a/streetlevel/kakao/kakao.py +++ b/streetlevel/kakao/kakao.py @@ -1,7 +1,5 @@ import itertools -import math from typing import List, Optional -from datetime import datetime import requests from PIL import Image @@ -9,9 +7,10 @@ from requests import Session from . import api -from .panorama import KakaoPanorama, PanoramaType -from ..dataclasses import Tile, Size, Link -from ..util import try_get, get_equirectangular_panorama, get_equirectangular_panorama_async, get_image, \ +from .panorama import KakaoPanorama +from .parse import parse_panoramas, parse_panorama +from ..dataclasses import Tile, Size +from ..util import get_equirectangular_panorama, get_equirectangular_panorama_async, get_image, \ get_image_async, download_file, download_file_async PANO_COLS = [1, 8, 16] @@ -43,7 +42,7 @@ def find_panoramas(lat: float, lon: float, radius: int = 35, if response["street_view"]["cnt"] == 0: return [] - return _parse_panoramas(response) + return parse_panoramas(response) async def find_panoramas_async(lat: float, lon: float, session: ClientSession, @@ -53,7 +52,7 @@ async def find_panoramas_async(lat: float, lon: float, session: ClientSession, if response["street_view"]["cnt"] == 0: return [] - return _parse_panoramas(response) + return parse_panoramas(response) def find_panorama_by_id(panoid: int, neighbors: bool = True, session: Session = None) -> Optional[KakaoPanorama]: @@ -74,7 +73,7 @@ def find_panorama_by_id(panoid: int, neighbors: bool = True, session: Session = if response["street_view"]["cnt"] == 0: return None - pano = _parse_panorama(response["street_view"]["street"]) + pano = parse_panorama(response["street_view"]["street"]) if neighbors: pano.neighbors = find_panoramas(pano.lat, pano.lon, session=session) return pano @@ -87,7 +86,7 @@ async def find_panorama_by_id_async(panoid: int, session: ClientSession, if response["street_view"]["cnt"] == 0: return None - pano = _parse_panorama(response["street_view"]["street"]) + pano = parse_panorama(response["street_view"]["street"]) if neighbors: pano.neighbors = await find_panoramas_async(pano.lat, pano.lon, session) return pano @@ -187,50 +186,6 @@ def _build_depthmap_url(pano): f"{pano.image_path}_W.png" -def _parse_panoramas(response): - return [_parse_panorama(pano) for pano in response["street_view"]["streetList"]] - - -def _parse_panorama(pano_json: dict) -> KakaoPanorama: - pano = KakaoPanorama( - id=pano_json["id"], - lat=pano_json["wgsy"], - lon=pano_json["wgsx"], - wcongx=pano_json["wcongx"], - wcongy=pano_json["wcongy"], - heading=math.radians(float(pano_json["angle"])), - image_path=pano_json["img_path"], - # shot_date sometimes returns the time as 00:00:00, but the image url is always correct - date=datetime.strptime(pano_json["img_path"].split("_")[-1], "%Y%m%d%H%M%S"), - street_name=try_get(lambda: pano_json["st_name"]), - address=try_get(lambda: pano_json["addr"]), - street_type=try_get(lambda: pano_json["st_type"]), - panorama_type=PanoramaType(int(pano_json["shot_tool"])) - ) - - if "past" in pano_json and pano_json["past"] is not None: - pano.historical = [_parse_panorama(past) for past in pano_json["past"]] - - if "spot" in pano_json and pano_json["past"] is not None: - pano.links = _parse_links(pano_json["spot"]) - - return pano - - -def _parse_links(links_json: List[dict]) -> List[Link]: - links = [] - for linked_json in links_json: - linked = KakaoPanorama( - id=linked_json["id"], - lat=linked_json["wgsy"], - lon=linked_json["wgsx"], - street_name=try_get(lambda: linked_json["st_name"]), - ) - angle = math.radians(float(linked_json["pan"])) - links.append(Link(linked, angle)) - return links - - def _generate_tile_list(pano: KakaoPanorama, zoom: int) -> List[Tile]: """ Generates a list of a panorama's tiles and the URLs pointing to them. diff --git a/streetlevel/kakao/parse.py b/streetlevel/kakao/parse.py new file mode 100644 index 0000000..b9580fa --- /dev/null +++ b/streetlevel/kakao/parse.py @@ -0,0 +1,51 @@ +import math +from datetime import datetime +from typing import List + +from streetlevel.dataclasses import Link +from streetlevel.kakao.panorama import KakaoPanorama, PanoramaType +from streetlevel.util import try_get + + +def parse_panoramas(response): + return [parse_panorama(pano) for pano in response["street_view"]["streetList"]] + + +def parse_panorama(pano_json: dict) -> KakaoPanorama: + pano = KakaoPanorama( + id=pano_json["id"], + lat=pano_json["wgsy"], + lon=pano_json["wgsx"], + wcongx=pano_json["wcongx"], + wcongy=pano_json["wcongy"], + heading=math.radians(float(pano_json["angle"])), + image_path=pano_json["img_path"], + # shot_date sometimes returns the time as 00:00:00, but the image url is always correct + date=datetime.strptime(pano_json["img_path"].split("_")[-1], "%Y%m%d%H%M%S"), + street_name=try_get(lambda: pano_json["st_name"]), + address=try_get(lambda: pano_json["addr"]), + street_type=try_get(lambda: pano_json["st_type"]), + panorama_type=PanoramaType(int(pano_json["shot_tool"])) + ) + + if "past" in pano_json and pano_json["past"] is not None: + pano.historical = [parse_panorama(past) for past in pano_json["past"]] + + if "spot" in pano_json and pano_json["past"] is not None: + pano.links = _parse_links(pano_json["spot"]) + + return pano + + +def _parse_links(links_json: List[dict]) -> List[Link]: + links = [] + for linked_json in links_json: + linked = KakaoPanorama( + id=linked_json["id"], + lat=linked_json["wgsy"], + lon=linked_json["wgsx"], + street_name=try_get(lambda: linked_json["st_name"]), + ) + angle = math.radians(float(linked_json["pan"])) + links.append(Link(linked, angle)) + return links diff --git a/streetlevel/lookaround/lookaround.py b/streetlevel/lookaround/lookaround.py index 75f3010..108a207 100644 --- a/streetlevel/lookaround/lookaround.py +++ b/streetlevel/lookaround/lookaround.py @@ -1,4 +1,3 @@ -from datetime import datetime from enum import IntEnum from typing import List, Union, Tuple @@ -8,9 +7,8 @@ from . import api from .auth import Authenticator -from .panorama import LookaroundPanorama, CoverageType, CameraMetadata, LensProjection, OrientedPosition -from .proto import GroundMetadataTile_pb2 -from .geo import protobuf_tile_offset_to_wgs84 +from .panorama import LookaroundPanorama +from .parse import parse_coverage_tile from .. import geo @@ -40,12 +38,12 @@ def get_coverage_tile(tile_x: int, tile_y: int, session: Session = None) -> List :return: A list of LookaroundPanoramas. If no coverage was returned by the API, the list is empty. """ tile = api.get_coverage_tile(tile_x, tile_y, session=session) - return _parse_panos(tile, tile_x, tile_y) + return parse_coverage_tile(tile) async def get_coverage_tile_async(tile_x: int, tile_y: int, session: ClientSession) -> List[LookaroundPanorama]: tile = await api.get_coverage_tile_async(tile_x, tile_y, session) - return _parse_panos(tile, tile_x, tile_y) + return parse_coverage_tile(tile) def get_coverage_tile_by_latlon(lat: float, lon: float, session: Session = None) -> List[LookaroundPanorama]: @@ -149,58 +147,6 @@ def _panoid_to_string(pano: Union[LookaroundPanorama, Tuple[int, int]]) -> Tuple return panoid, build_id -def _parse_panos(tile: GroundMetadataTile_pb2.GroundMetadataTile, tile_x: int, tile_y: int) -> List[LookaroundPanorama]: - panos = [] - camera_metadatas = [_camera_metadata_to_dataclass(c) for c in tile.camera_metadata] - for pano_pb in tile.pano: - lat, lon = protobuf_tile_offset_to_wgs84( - pano_pb.tile_position.x, - pano_pb.tile_position.y, - tile_x, - tile_y) - pano = LookaroundPanorama( - id=pano_pb.panoid, - build_id=tile.build_table[pano_pb.build_table_idx].build_id, - lat=lat, - lon=lon, - coverage_type=CoverageType(tile.build_table[pano_pb.build_table_idx].coverage_type), - date=datetime.utcfromtimestamp(pano_pb.timestamp / 1000.0), - has_blurs=tile.build_table[pano_pb.build_table_idx].index != 0, - raw_orientation=(pano_pb.tile_position.yaw, pano_pb.tile_position.pitch, pano_pb.tile_position.roll), - raw_altitude=pano_pb.tile_position.altitude, - tile=(tile_x, tile_y, 17), - camera_metadata=[camera_metadatas[i] for i in pano_pb.camera_metadata_idx] - ) - panos.append(pano) - return panos - - -def _camera_metadata_to_dataclass(camera_metadata_pb: GroundMetadataTile_pb2.CameraMetadata): - lens_projection_pb = camera_metadata_pb.lens_projection - position_pb = camera_metadata_pb.position - return CameraMetadata( - lens_projection=LensProjection( - fov_s=lens_projection_pb.fov_s, - fov_h=lens_projection_pb.fov_h, - k2=lens_projection_pb.k2, - k3=lens_projection_pb.k3, - k4=lens_projection_pb.k4, - cx=lens_projection_pb.cx, - cy=lens_projection_pb.cy, - lx=lens_projection_pb.lx, - ly=lens_projection_pb.ly, - ), - position=OrientedPosition( - x=position_pb.x, - y=position_pb.y, - z=position_pb.z, - yaw=position_pb.yaw, - pitch=position_pb.pitch, - roll=position_pb.roll, - ) - ) - - def _build_panorama_face_url(panoid: str, build_id: str, face: int, zoom: int, auth: Authenticator) -> str: zoom = min(7, zoom) panoid_padded = panoid.zfill(20) diff --git a/streetlevel/lookaround/parse.py b/streetlevel/lookaround/parse.py new file mode 100644 index 0000000..e4e8530 --- /dev/null +++ b/streetlevel/lookaround/parse.py @@ -0,0 +1,60 @@ +from datetime import datetime +from typing import List + +from streetlevel.lookaround.geo import protobuf_tile_offset_to_wgs84 +from streetlevel.lookaround.panorama import LookaroundPanorama, CoverageType, LensProjection, CameraMetadata, \ + OrientedPosition +from streetlevel.lookaround.proto import GroundMetadataTile_pb2 + + +def parse_coverage_tile(tile: GroundMetadataTile_pb2.GroundMetadataTile) \ + -> List[LookaroundPanorama]: + panos = [] + camera_metadatas = [_camera_metadata_to_dataclass(c) for c in tile.camera_metadata] + for pano_pb in tile.pano: + lat, lon = protobuf_tile_offset_to_wgs84( + pano_pb.tile_position.x, + pano_pb.tile_position.y, + tile.tile_coordinate.x, + tile.tile_coordinate.y) + pano = LookaroundPanorama( + id=pano_pb.panoid, + build_id=tile.build_table[pano_pb.build_table_idx].build_id, + lat=lat, + lon=lon, + coverage_type=CoverageType(tile.build_table[pano_pb.build_table_idx].coverage_type), + date=datetime.utcfromtimestamp(pano_pb.timestamp / 1000.0), + has_blurs=tile.build_table[pano_pb.build_table_idx].index != 0, + raw_orientation=(pano_pb.tile_position.yaw, pano_pb.tile_position.pitch, pano_pb.tile_position.roll), + raw_altitude=pano_pb.tile_position.altitude, + tile=(tile.tile_coordinate.x, tile.tile_coordinate.y, tile.tile_coordinate.z), + camera_metadata=[camera_metadatas[i] for i in pano_pb.camera_metadata_idx] + ) + panos.append(pano) + return panos + + +def _camera_metadata_to_dataclass(camera_metadata_pb: GroundMetadataTile_pb2.CameraMetadata): + lens_projection_pb = camera_metadata_pb.lens_projection + position_pb = camera_metadata_pb.position + return CameraMetadata( + lens_projection=LensProjection( + fov_s=lens_projection_pb.fov_s, + fov_h=lens_projection_pb.fov_h, + k2=lens_projection_pb.k2, + k3=lens_projection_pb.k3, + k4=lens_projection_pb.k4, + cx=lens_projection_pb.cx, + cy=lens_projection_pb.cy, + lx=lens_projection_pb.lx, + ly=lens_projection_pb.ly, + ), + position=OrientedPosition( + x=position_pb.x, + y=position_pb.y, + z=position_pb.z, + yaw=position_pb.yaw, + pitch=position_pb.pitch, + roll=position_pb.roll, + ) + ) diff --git a/streetlevel/lookaround/reproject.py b/streetlevel/lookaround/reproject.py index fb7664d..9260d74 100644 --- a/streetlevel/lookaround/reproject.py +++ b/streetlevel/lookaround/reproject.py @@ -6,7 +6,7 @@ import torch from torchvision import transforms -from streetlevel.lookaround import CameraMetadata +from streetlevel.lookaround.panorama import CameraMetadata _equi2equi = Equi2Equi(mode="bilinear", z_down=True) _device = torch.device("cuda") diff --git a/streetlevel/mapy/mapy.py b/streetlevel/mapy/mapy.py index fdd2682..03bcac4 100644 --- a/streetlevel/mapy/mapy.py +++ b/streetlevel/mapy/mapy.py @@ -1,15 +1,14 @@ import itertools -import math from typing import List, Optional -from PIL import Image from aiohttp import ClientSession +from PIL import Image +from requests import Session -from .panorama import MapyPanorama from . import api -from requests import Session -from ..dataclasses import Size, Tile, Link -from ..geo import opk_to_rotation +from .panorama import MapyPanorama +from .parse import parse_pan_info_dict, parse_neighbors_response, parse_getbest_response +from ..dataclasses import Tile, Link from ..util import get_equirectangular_panorama, get_equirectangular_panorama_async, get_image, get_image_async @@ -31,23 +30,14 @@ def find_panorama(lat: float, lon: float, panoramas from other years. Defaults to True. :return: A MapyPanorama object if a panorama was found, or None. """ - radius = float(radius) - if year is None: - options = None - else: - options = {'year': year, 'nopenalties': True} + options, radius = _validate_find_panorama_params(radius, year) response = api.getbest(lat, lon, radius, options=options) - - if response["status"] != 200: - return None - - pan_info = response["result"]["panInfo"] - pano = _parse_pan_info_dict(pan_info) + pano = parse_getbest_response(response) if links: pano.links = get_links(pano.id, year=pano.date.year) if historical: - _append_historical(lat, lon, pan_info, pano) + _append_historical(pano, response["result"]["panInfo"], lat, lon) return pano @@ -57,24 +47,14 @@ async def find_panorama_async(lat: float, lon: float, year: Optional[int] = None, links: bool = True, historical: bool = True) -> Optional[MapyPanorama]: - # TODO reduce duplication - radius = float(radius) - if year is None: - options = None - else: - options = {'year': year, 'nopenalties': True} + options, radius = _validate_find_panorama_params(radius, year) response = await api.getbest_async(lat, lon, radius, options=options) - - if response["status"] != 200: - return None - - pan_info = response["result"]["panInfo"] - pano = _parse_pan_info_dict(pan_info) + pano = parse_getbest_response(response) if links: pano.links = await get_links_async(pano.id, year=pano.date.year) if historical: - await _append_historical_async(lat, lon, pan_info, pano) + await _append_historical_async(pano, response["result"]["panInfo"], lat, lon) return pano @@ -98,12 +78,12 @@ def find_panorama_by_id(panoid: int, return None pan_info = response["result"] - pano = _parse_pan_info_dict(pan_info) + pano = parse_pan_info_dict(pan_info) if links: pano.links = get_links(pano.id, year=pano.date.year) if historical: - _append_historical(pano.lat, pano.lon, pan_info, pano) + _append_historical(pano, pan_info, pano.lat, pano.lon) return pano @@ -117,12 +97,12 @@ async def find_panorama_by_id_async(panoid: int, return None pan_info = response["result"] - pano = _parse_pan_info_dict(pan_info) + pano = parse_pan_info_dict(pan_info) if links: pano.links = await get_links_async(pano.id, year=pano.date.year) if historical: - await _append_historical_async(pano.lat, pano.lon, pan_info, pano) + await _append_historical_async(pano, pan_info, pano.lat, pano.lon) return pano @@ -142,11 +122,7 @@ def get_links(panoid: int, year: Optional[int] = None) -> List[Link]: options = {"year": year} response = api.getneighbours(panoid, options) - - if response["status"] != 200: - return [] - - return _neighbors_response_to_links(response) + return parse_neighbors_response(response) async def get_links_async(panoid: int, year: Optional[int] = None) -> List[Link]: @@ -156,11 +132,7 @@ async def get_links_async(panoid: int, year: Optional[int] = None) -> List[Link] options = {"year": year} response = await api.getneighbours_async(panoid, options) - - if response["status"] != 200: - return [] - - return _neighbors_response_to_links(response) + return parse_neighbors_response(response) def get_panorama(pano: MapyPanorama, zoom: int = 2) -> Image.Image: @@ -222,7 +194,16 @@ async def download_panorama_async(pano: MapyPanorama, path: str, session: Client image.save(path, **pil_args) -def _append_historical(lat, lon, pan_info, pano): +def _validate_find_panorama_params(radius, year): + radius = float(radius) + if year is None: + options = None + else: + options = {'year': year, 'nopenalties': True} + return options, radius + + +def _append_historical(pano, pan_info, lat, lon): for year in pan_info["timeline"]: if pano.date.year == year: continue @@ -231,7 +212,7 @@ def _append_historical(lat, lon, pan_info, pano): pano.historical.append(historical_pano) -async def _append_historical_async(lat, lon, pan_info, pano): +async def _append_historical_async(pano, pan_info, lat, lon): for year in pan_info["timeline"]: if pano.date.year == year: continue @@ -240,65 +221,6 @@ async def _append_historical_async(lat, lon, pan_info, pano): pano.historical.append(historical_pano) -def _neighbors_response_to_links(response: dict) -> List[Link]: - panos = [] - for pan_info in response["result"]["neighbours"]: - pano = _parse_pan_info_dict(pan_info["near"]) - angle = math.radians(float(pan_info["angle"])) - panos.append(Link(pano, angle)) - return panos - - -def _parse_pan_info_dict(pan_info: dict) -> MapyPanorama: - pano = MapyPanorama( - id=pan_info["pid"], - lat=pan_info["mark"]["lat"], - lon=pan_info["mark"]["lon"], - tile_size=Size(pan_info["tileWidth"], pan_info["tileHeight"]), - domain_prefix=pan_info["domainPrefix"], - uri_path=pan_info["uriPath"], - file_mask=pan_info["fileMask"], - max_zoom=pan_info["maxZoom"], - date=pan_info["createdAt"], - elevation=pan_info["mark"]["alt"], - provider=pan_info["provider"], - ) - - _parse_angles(pan_info, pano) - pano.num_tiles = _parse_num_tiles(pan_info) - - return pano - - -def _parse_num_tiles(pan_info: dict) -> List[Size]: - # zoom level 0 - num_tiles = [Size(1, 1)] - # zoom levels 1 and 2 for cyclomedia - if "extra" in pan_info and "tileNumX" in pan_info["extra"]: - for i in range(0, len(pan_info["extra"]["tileNumX"])): - num = Size(int(pan_info["extra"]["tileNumX"][i]), - int(pan_info["extra"]["tileNumY"][i])) - num_tiles.append(num) - # zoom level 1 for other providers - else: - num_tiles.append(Size(pan_info["tileNumX"], pan_info["tileNumY"])) - return num_tiles - - -def _parse_angles(pan_info: dict, pano: MapyPanorama) -> None: - if "extra" in pan_info and "carDirection" in pan_info["extra"]: - pano.heading = math.radians(pan_info["extra"]["carDirection"]) - - pano.omega = math.radians(pan_info["omega"]) - pano.phi = math.radians(pan_info["phi"]) - pano.kappa = math.radians(pan_info["kappa"]) - heading, pitch, roll = opk_to_rotation(pano.omega, pano.phi, pano.kappa).as_euler('yxz') - if not pano.heading: - pano.heading = heading - pano.pitch = pitch - pano.roll = roll - - def _get_zoom_0(pano: MapyPanorama, session: Session = None) -> Image.Image: return get_image(_generate_tile_list(pano, 0)[0].url, session=session) diff --git a/streetlevel/mapy/parse.py b/streetlevel/mapy/parse.py new file mode 100644 index 0000000..a70c89d --- /dev/null +++ b/streetlevel/mapy/parse.py @@ -0,0 +1,75 @@ +import math +from typing import List, Optional + +from streetlevel.dataclasses import Size, Link +from streetlevel.geo import opk_to_rotation +from streetlevel.mapy.panorama import MapyPanorama + + +def parse_getbest_response(response: dict) -> Optional[MapyPanorama]: + if response["status"] != 200: + return None + pan_info = response["result"]["panInfo"] + return parse_pan_info_dict(pan_info) + + +def parse_pan_info_dict(pan_info: dict) -> MapyPanorama: + pano = MapyPanorama( + id=pan_info["pid"], + lat=pan_info["mark"]["lat"], + lon=pan_info["mark"]["lon"], + tile_size=Size(pan_info["tileWidth"], pan_info["tileHeight"]), + domain_prefix=pan_info["domainPrefix"], + uri_path=pan_info["uriPath"], + file_mask=pan_info["fileMask"], + max_zoom=pan_info["maxZoom"], + date=pan_info["createdAt"], + elevation=pan_info["mark"]["alt"], + provider=pan_info["provider"], + ) + + _parse_angles(pan_info, pano) + pano.num_tiles = _parse_num_tiles(pan_info) + + return pano + + +def parse_neighbors_response(response: dict) -> List[Link]: + if response["status"] != 200: + return [] + + panos = [] + for pan_info in response["result"]["neighbours"]: + pano = parse_pan_info_dict(pan_info["near"]) + angle = math.radians(float(pan_info["angle"])) + panos.append(Link(pano, angle)) + return panos + + +def _parse_num_tiles(pan_info: dict) -> List[Size]: + # zoom level 0 + num_tiles = [Size(1, 1)] + # zoom levels 1 and 2 for cyclomedia + if "extra" in pan_info and "tileNumX" in pan_info["extra"]: + for i in range(0, len(pan_info["extra"]["tileNumX"])): + num = Size(int(pan_info["extra"]["tileNumX"][i]), + int(pan_info["extra"]["tileNumY"][i])) + num_tiles.append(num) + # zoom level 1 for other providers + else: + num_tiles.append(Size(pan_info["tileNumX"], pan_info["tileNumY"])) + return num_tiles + + +def _parse_angles(pan_info: dict, pano: MapyPanorama) -> None: + if "extra" in pan_info and "carDirection" in pan_info["extra"]: + pano.heading = math.radians(pan_info["extra"]["carDirection"]) + + pano.omega = math.radians(pan_info["omega"]) + pano.phi = math.radians(pan_info["phi"]) + pano.kappa = math.radians(pan_info["kappa"]) + heading, pitch, roll = opk_to_rotation(pano.omega, pano.phi, pano.kappa).as_euler('yxz') + if not pano.heading: + pano.heading = heading + pano.pitch = pitch + pano.roll = roll diff --git a/streetlevel/naver/naver.py b/streetlevel/naver/naver.py index 9fc4185..846f928 100644 --- a/streetlevel/naver/naver.py +++ b/streetlevel/naver/naver.py @@ -1,5 +1,3 @@ -import math -from datetime import datetime from typing import Optional, List, Union, Tuple import numpy as np @@ -8,8 +6,9 @@ from requests import Session from . import api -from .panorama import NaverPanorama, PanoramaType, Overlay, Neighbors -from ..dataclasses import Tile, Link +from .panorama import NaverPanorama, Neighbors +from .parse import parse_panorama, parse_nearby, parse_historical, parse_neighbors +from ..dataclasses import Tile from ..util import download_tiles, CubemapStitchingMethod, stitch_cubemap_faces, download_tiles_async, \ save_cubemap_panorama, get_image, get_image_async, stitch_cubemap_face @@ -37,7 +36,7 @@ def find_panorama_by_id(panoid: str, language: str = "en", if "errors" in response: return None - pano = _parse_panorama(response) + pano = parse_panorama(response) if neighbors: pano.neighbors = get_neighbors(pano.id, session=session) if historical: @@ -54,7 +53,7 @@ async def find_panorama_by_id_async(panoid: str, session: ClientSession, languag if "errors" in response: return None - pano = _parse_panorama(response) + pano = parse_panorama(response) if neighbors: pano.neighbors = await get_neighbors_async(pano.id, session) if historical: @@ -87,7 +86,7 @@ def find_panorama(lat: float, lon: float, neighbors: bool = True, historical: bo if "error" in response or len(response["features"]) == 0: return None - pano = _parse_nearby(response) + pano = parse_nearby(response) if neighbors: pano.neighbors = get_neighbors(pano.id, session=session) if historical: @@ -104,7 +103,7 @@ async def find_panorama_async(lat: float, lon: float, session: ClientSession, ne if "error" in response or len(response["features"]) == 0: return None - pano = _parse_nearby(response) + pano = parse_nearby(response) if neighbors: pano.neighbors = await get_neighbors_async(pano.id, session) if historical: @@ -129,7 +128,7 @@ def get_historical(panoid: str, session: Session = None) -> List[NaverPanorama]: if "errors" in response: return [] - return _parse_historical(response, panoid) + return parse_historical(response, panoid) async def get_historical_async(panoid: str, session: ClientSession) -> List[NaverPanorama]: @@ -138,7 +137,7 @@ async def get_historical_async(panoid: str, session: ClientSession) -> List[Nave if "errors" in response: return [] - return _parse_historical(response, panoid) + return parse_historical(response, panoid) def get_neighbors(panoid: str, session: Session = None) -> Neighbors: @@ -154,7 +153,7 @@ def get_neighbors(panoid: str, session: Session = None) -> Neighbors: if "errors" in response: return Neighbors([], []) - return _parse_neighbors(response, panoid) + return parse_neighbors(response, panoid) async def get_neighbors_async(panoid: str, session: ClientSession) -> Neighbors: @@ -163,7 +162,7 @@ async def get_neighbors_async(panoid: str, session: ClientSession) -> Neighbors: if "errors" in response: return Neighbors([], []) - return _parse_neighbors(response, panoid) + return parse_neighbors(response, panoid) def get_panorama(pano: NaverPanorama, zoom: int = 2, @@ -323,102 +322,3 @@ def _stitch_panorama(tile_images: List[dict], cols: int, rows: int, stitched_face = stitch_cubemap_face(tiles, 512, cols, rows) stitched_faces.append(stitched_face) return stitch_cubemap_faces(stitched_faces, stitched_faces[0].size[0], stitching_method) - - -def _parse_neighbors(response: dict, parent_id: str) -> Neighbors: - street = _parse_neighbor_section(response, "street", parent_id) - other = _parse_neighbor_section(response, "air", parent_id) - return Neighbors(street, other) - - -def _parse_neighbor_section(response: dict, section: str, parent_id: str) -> List[NaverPanorama]: - panos = [] - if section in response["around"]["panoramas"]: - for raw_pano in response["around"]["panoramas"][section][1:]: - if raw_pano[0] == parent_id: - continue - elevation = raw_pano[4] * 0.01 - pano = NaverPanorama( - id=raw_pano[0], - lat=raw_pano[2], - lon=raw_pano[1], - elevation=elevation, - camera_height=(raw_pano[3] * 0.01) - elevation) - panos.append(pano) - return panos - - -def _parse_historical(response: dict, parent_id: str) -> List[NaverPanorama]: - panos = response["timeline"]["panoramas"][1:] - return [NaverPanorama( - id=pano[0], - lat=pano[2], - lon=pano[1], - date=datetime.strptime(pano[3], "%Y-%m-%d %H:%M:%S.0") - ) for pano in panos if pano[0] != parent_id] - - -def _parse_nearby(response: dict) -> NaverPanorama: - feature = response["features"][0] - elevation = feature["properties"]["land_altitude"] * 0.01 - return NaverPanorama( - id=feature["properties"]["id"], - lat=feature["geometry"]["coordinates"][1], - lon=feature["geometry"]["coordinates"][0], - heading=math.radians(feature["properties"]["camera_angle"][1]), - date=_parse_date(feature["properties"]["photodate"]), - description=feature["properties"]["description"], - title=feature["properties"]["title"], - elevation=elevation, - camera_height=(feature["properties"]["camera_altitude"] * 0.01) - elevation - ) - - -def _parse_date(date_str: str) -> datetime: - return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") - - -def _parse_panorama(response: dict) -> NaverPanorama: - basic = response["basic"] - elevation = basic["land_altitude"] * 0.01 - pano = NaverPanorama( - id=basic["id"], - lat=basic["latitude"], - lon=basic["longitude"], - heading=math.radians(basic["camera_angle"][1]), - max_zoom=int(basic["image"]["segment"]) // 2, - timeline_id=basic["timeline_id"], - date=_parse_date(basic["photodate"]), - is_latest=basic["latest"], - description=basic["description"], - title=basic["title"], - panorama_type=PanoramaType(int(basic["dtl_type"])), - elevation=elevation, - camera_height=(basic["camera_altitude"] * 0.01) - elevation - ) - - if len(basic["image"]["overlays"]) > 1: - pano.overlay = Overlay( - "https://panorama.map.naver.com" + basic["image"]["overlays"][1][0], - "https://panorama.map.naver.com" + basic["image"]["overlays"][1][1]) - - pano.links = _parse_links(basic["links"]) - - return pano - - -def _parse_links(links_json: List) -> Optional[List[Link]]: - if len(links_json) < 2: - return None - - links = [] - for linked_json in links_json[1:]: - linked = NaverPanorama( - id=linked_json[0], - title=linked_json[1], - lat=linked_json[5], - lon=linked_json[4], - ) - angle = math.radians(float(linked_json[2])) - links.append(Link(linked, angle)) - return links diff --git a/streetlevel/naver/parse.py b/streetlevel/naver/parse.py new file mode 100644 index 0000000..b0ede3b --- /dev/null +++ b/streetlevel/naver/parse.py @@ -0,0 +1,105 @@ +import math +from datetime import datetime +from typing import List, Optional + +from streetlevel.dataclasses import Link +from streetlevel.naver.panorama import Neighbors, NaverPanorama, PanoramaType, Overlay + + +def parse_panorama(response: dict) -> NaverPanorama: + basic = response["basic"] + elevation = basic["land_altitude"] * 0.01 + pano = NaverPanorama( + id=basic["id"], + lat=basic["latitude"], + lon=basic["longitude"], + heading=math.radians(basic["camera_angle"][1]), + max_zoom=int(basic["image"]["segment"]) // 2, + timeline_id=basic["timeline_id"], + date=_parse_date(basic["photodate"]), + is_latest=basic["latest"], + description=basic["description"], + title=basic["title"], + panorama_type=PanoramaType(int(basic["dtl_type"])), + elevation=elevation, + camera_height=(basic["camera_altitude"] * 0.01) - elevation + ) + + if len(basic["image"]["overlays"]) > 1: + pano.overlay = Overlay( + "https://panorama.map.naver.com" + basic["image"]["overlays"][1][0], + "https://panorama.map.naver.com" + basic["image"]["overlays"][1][1]) + + pano.links = _parse_links(basic["links"]) + + return pano + + +def parse_neighbors(response: dict, parent_id: str) -> Neighbors: + street = _parse_neighbor_section(response, "street", parent_id) + other = _parse_neighbor_section(response, "air", parent_id) + return Neighbors(street, other) + + +def _parse_neighbor_section(response: dict, section: str, parent_id: str) -> List[NaverPanorama]: + panos = [] + if section in response["around"]["panoramas"]: + for raw_pano in response["around"]["panoramas"][section][1:]: + if raw_pano[0] == parent_id: + continue + elevation = raw_pano[4] * 0.01 + pano = NaverPanorama( + id=raw_pano[0], + lat=raw_pano[2], + lon=raw_pano[1], + elevation=elevation, + camera_height=(raw_pano[3] * 0.01) - elevation) + panos.append(pano) + return panos + + +def parse_historical(response: dict, parent_id: str) -> List[NaverPanorama]: + panos = response["timeline"]["panoramas"][1:] + return [NaverPanorama( + id=pano[0], + lat=pano[2], + lon=pano[1], + date=datetime.strptime(pano[3], "%Y-%m-%d %H:%M:%S.0") + ) for pano in panos if pano[0] != parent_id] + + +def parse_nearby(response: dict) -> NaverPanorama: + feature = response["features"][0] + elevation = feature["properties"]["land_altitude"] * 0.01 + return NaverPanorama( + id=feature["properties"]["id"], + lat=feature["geometry"]["coordinates"][1], + lon=feature["geometry"]["coordinates"][0], + heading=math.radians(feature["properties"]["camera_angle"][1]), + date=_parse_date(feature["properties"]["photodate"]), + description=feature["properties"]["description"], + title=feature["properties"]["title"], + elevation=elevation, + camera_height=(feature["properties"]["camera_altitude"] * 0.01) - elevation + ) + + +def _parse_date(date_str: str) -> datetime: + return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") + + +def _parse_links(links_json: List) -> Optional[List[Link]]: + if len(links_json) < 2: + return None + + links = [] + for linked_json in links_json[1:]: + linked = NaverPanorama( + id=linked_json[0], + title=linked_json[1], + lat=linked_json[5], + lon=linked_json[4], + ) + angle = math.radians(float(linked_json[2])) + links.append(Link(linked, angle)) + return links diff --git a/streetlevel/streetside/parse.py b/streetlevel/streetside/parse.py new file mode 100644 index 0000000..ed2beb4 --- /dev/null +++ b/streetlevel/streetside/parse.py @@ -0,0 +1,46 @@ +import math +from datetime import datetime +from typing import Optional, List + +from streetlevel.streetside.panorama import StreetsidePanorama + + +def parse_panoramas_id_response(response: dict) -> Optional[StreetsidePanorama]: + if len(response) < 2: + return None + return parse_panorama(response[1]) + + +def parse_panoramas(response: dict) -> List[StreetsidePanorama]: + panos = [] + for pano in response[1:]: # first object is elapsed time + pano_obj = parse_panorama(pano) + panos.append(pano_obj) + return panos + + +def parse_panorama(pano: dict) -> StreetsidePanorama: + # TODO: parse bl, nbn, pbn, ad fields + # as it turns out, months/days without leading zeros + # don't have a cross-platform format code in strptime. + # wanna guess what kind of dates bing returns? + datestr = pano["cd"] + datestr = datestr.split("/") + datestr[0] = datestr[0].rjust(2, "0") + datestr[1] = datestr[1].rjust(2, "0") + datestr = "/".join(datestr) + date = datetime.strptime(datestr, "%m/%d/%Y %I:%M:%S %p") + pano_obj = StreetsidePanorama( + id=pano["id"], + lat=pano["la"], + lon=pano["lo"], + date=date, + next=pano["ne"] if "ne" in pano else None, + previous=pano["pr"] if "pr" in pano else None, + elevation=pano["al"] if "al" in pano else None, + heading=math.radians(pano["he"]) if "he" in pano else None, + pitch=math.radians(pano["pi"]) if "pi" in pano else None, + roll=math.radians(pano["ro"]) if "ro" in pano else None, + max_zoom=int(pano["ml"]) + ) + return pano_obj diff --git a/streetlevel/streetside/streetside.py b/streetlevel/streetside/streetside.py index 6bd9801..d865f4e 100644 --- a/streetlevel/streetside/streetside.py +++ b/streetlevel/streetside/streetside.py @@ -1,5 +1,4 @@ import asyncio -from datetime import datetime from io import BytesIO from typing import List, Union, Optional @@ -7,9 +6,10 @@ from aiohttp import ClientSession from requests import Session -from .panorama import StreetsidePanorama from streetlevel.geo import * from . import api +from .panorama import StreetsidePanorama +from .parse import parse_panoramas, parse_panorama, parse_panoramas_id_response from .util import to_base4 from ..util import download_files_async, stitch_cubemap_faces, CubemapStitchingMethod, save_cubemap_panorama @@ -25,18 +25,12 @@ def find_panorama_by_id(panoid: int, session: Session = None) -> Optional[Street :return: A StreetsidePanorama object if a panorama was found, or None. """ response = api.find_panorama_by_id(panoid, session) - if len(response) < 2: - return None - pano = _parse_pano(response[1]) - return pano + return parse_panoramas_id_response(response) async def find_panorama_by_id_async(panoid: int, session: ClientSession) -> Optional[StreetsidePanorama]: response = await api.find_panorama_by_id_async(panoid, session) - if len(response) < 2: - return None - pano = _parse_pano(response[1]) - return pano + return parse_panoramas_id_response(response) def find_panoramas_in_bbox(north: float, west: float, south: float, east: float, @@ -53,15 +47,13 @@ def find_panoramas_in_bbox(north: float, west: float, south: float, east: float, :return: A list of StreetsidePanorama objects. """ response = api.find_panoramas(north, west, south, east, limit, session) - panos = _parse_panos(response) - return panos + return parse_panoramas(response) async def find_panoramas_in_bbox_async(north: float, west: float, south: float, east: float, session: ClientSession, limit: int = 50) -> List[StreetsidePanorama]: response = await api.find_panoramas_async(north, west, south, east, session, limit) - panos = _parse_panos(response) - return panos + return parse_panoramas(response) def find_panoramas(lat: float, lon: float, radius: float = 25, @@ -157,41 +149,6 @@ async def get_panorama_async(pano: StreetsidePanorama, session: ClientSession, z return _stitch_panorama(faces, stitching_method=stitching_method) -def _parse_panos(response): - panos = [] - for pano in response[1:]: # first object is elapsed time - pano_obj = _parse_pano(pano) - panos.append(pano_obj) - return panos - - -def _parse_pano(pano): - # TODO: parse bl, nbn, pbn, ad fields - # as it turns out, months/days without leading zeros - # don't have a cross-platform format code in strptime. - # wanna guess what kind of dates bing returns? - datestr = pano["cd"] - datestr = datestr.split("/") - datestr[0] = datestr[0].rjust(2, "0") - datestr[1] = datestr[1].rjust(2, "0") - datestr = "/".join(datestr) - date = datetime.strptime(datestr, "%m/%d/%Y %I:%M:%S %p") - pano_obj = StreetsidePanorama( - id=pano["id"], - lat=pano["la"], - lon=pano["lo"], - date=date, - next=pano["ne"] if "ne" in pano else None, - previous=pano["pr"] if "pr" in pano else None, - elevation=pano["al"] if "al" in pano else None, - heading=math.radians(pano["he"]) if "he" in pano else None, - pitch=math.radians(pano["pi"]) if "pi" in pano else None, - roll=math.radians(pano["ro"]) if "ro" in pano else None, - max_zoom=int(pano["ml"]) - ) - return pano_obj - - def _generate_tile_list(panoid, zoom): """ Generates a list of a panorama's tiles. diff --git a/streetlevel/streetview/parse.py b/streetlevel/streetview/parse.py new file mode 100644 index 0000000..74bf916 --- /dev/null +++ b/streetlevel/streetview/parse.py @@ -0,0 +1,269 @@ +import math +from typing import List, Tuple, Optional + +from streetlevel.dataclasses import Link, Size +from streetlevel.geo import get_bearing +from streetlevel.streetview.panorama import StreetViewPanorama, LocalizedString, UploadDate, \ + CaptureDate, Place, BusinessStatus, ArtworkLink, Artwork, BuildingLevel, StreetLabel +from streetlevel.streetview.depth import parse as parse_depth +from streetlevel.util import try_get + + +def parse_panorama_id_response(response: dict) -> Optional[StreetViewPanorama]: + response_code = response[1][0][0][0] + # 1: OK + # 2: Not found + # don't know if there are others + if response_code != 1: + return None + return parse_panorama_message(response[1][0]) + + +def parse_panorama_radius_response(response: dict) -> Optional[StreetViewPanorama]: + response_code = response[0][0][0] + # 0: OK + # 5: search returned no images + # don't know if there are others + if response_code != 0: + return None + return parse_panorama_message(response[0][1]) + + +def parse_coverage_tile_response(tile: list) -> List[StreetViewPanorama]: + if tile is None: + return [] + + panos = [] + if tile[1] is not None and len(tile[1]) > 0: + for raw_pano in tile[1][1]: + if raw_pano[0][0] == 1: + continue + panos.append( + StreetViewPanorama(id=raw_pano[0][0][1], + lat=raw_pano[0][2][0][2], + lon=raw_pano[0][2][0][3], + heading=math.radians(raw_pano[0][2][2][0]), + pitch=math.radians(90 - raw_pano[0][2][2][1]), + roll=math.radians(raw_pano[0][2][2][2]), + elevation=raw_pano[0][2][1][0])) + for idx, raw_pano in enumerate(tile[1][1]): + link_indices = raw_pano[1] + panos[idx].links = [Link(panos[link_idx], + get_bearing(panos[idx].lat, panos[idx].lon, + panos[link_idx].lat, panos[link_idx].lon)) + for link_idx in link_indices] + return panos + + +def parse_panorama_message(msg: dict) -> StreetViewPanorama: + img_sizes = msg[2][3][0] + img_sizes = list(map(lambda x: Size(x[0][1], x[0][0]), img_sizes)) + others = try_get(lambda: msg[5][0][3][0]) + date = try_get(lambda: msg[6][7]) + + links, other_bld_levels, other_dates = _parse_other_pano_indices(msg) + + street_names = try_get(lambda: msg[5][0][12]) + if street_names is not None: + street_names = [_parse_street_name(street_name) for street_name in street_names] + + address = try_get(lambda: msg[3][2]) + if address is not None: + address = [LocalizedString(x[0], x[1]) for x in address] + + depth = try_get(lambda: msg[5][0][5][1][2]) + if depth: + depth = parse_depth(depth) + + upload_date = try_get(lambda: msg[6][8]) + if upload_date: + upload_date = UploadDate(*upload_date) + + places_raw = try_get(lambda: msg[5][0][9]) + if places_raw: + artworks, places = _parse_places(places_raw) + else: + artworks, places = None, None + + pano = StreetViewPanorama( + id=msg[1][1], + lat=msg[5][0][1][0][2], + lon=msg[5][0][1][0][3], + heading=try_get(lambda: math.radians(msg[5][0][1][2][0])), + pitch=try_get(lambda: math.radians(90 - msg[5][0][1][2][1])), + roll=try_get(lambda: math.radians(msg[5][0][1][2][2])), + depth=depth, + date=CaptureDate(date[0], + date[1], + date[2] if len(date) > 2 else None) if date else None, + upload_date=upload_date, + elevation=try_get(lambda: msg[5][0][1][1][0]), + tile_size=Size(msg[2][3][1][0], msg[2][3][1][1]), + image_sizes=img_sizes, + source=try_get(lambda: msg[6][5][2].lower()), + country_code=try_get(lambda: msg[5][0][1][4]), + street_names=street_names, + address=address, + copyright_message=try_get(lambda: msg[4][0][0][0][0]), + uploader=try_get(lambda: msg[4][1][0][0][0]), + uploader_icon_url=try_get(lambda: msg[4][1][0][2]), + building_level=_parse_building_level_message(try_get(lambda: msg[5][0][1][3])), + artworks=artworks, + places=places, + ) + + # parse other dates, links and neighbors + if others is not None: + for idx, other in enumerate(others): + other_id = other[0][1] + if pano.id == other_id: + continue + + connected = StreetViewPanorama( + id=other_id, + lat=try_get(lambda: float(other[2][0][2])), + lon=try_get(lambda: float(other[2][0][3])), + elevation=try_get(lambda: other[2][1][0]), + heading=try_get(lambda: math.radians(other[2][2][0])), + pitch=try_get(lambda: math.radians(90 - other[2][2][1])), + roll=try_get(lambda: math.radians(other[2][2][2])), + building_level=_parse_building_level_message(try_get(lambda: other[2][3])), + ) + + if idx in other_dates: + if other_dates[idx]: + connected.date = CaptureDate(other_dates[idx][0], other_dates[idx][1]) + pano.historical.append(connected) + else: + if idx in links: + if links[idx]: + angle = math.radians(links[idx][3]) + else: + angle = get_bearing(pano.lat, pano.lon, connected.lat, connected.lon) + pano.links.append(Link(connected, angle)) + if idx in other_bld_levels: + pano.building_levels.append(connected) + pano.neighbors.append(connected) + + other_address = try_get(lambda: other[3][2]) + if other_address: + connected.address = [LocalizedString(x[0], x[1]) for x in other_address] + pano.historical = sorted(pano.historical, + key=lambda x: (x.date.year, x.date.month) if x.date else None, + reverse=True) + + return pano + + +def _parse_street_name(msg: dict) -> StreetLabel: + # Unknown what [0][0][0][1] is, but it may be something interesting, always 2 numbers as a string + name_raw = msg[0][0][2] + name = LocalizedString(name_raw[0], name_raw[1]) + angles = [math.radians(angle) for angle in msg[1]] + return StreetLabel(name, angles) + + +def _parse_other_pano_indices(msg: dict) -> Tuple[dict, list, dict]: + links_raw = try_get(lambda: msg[5][0][6]) + if links_raw: + links = dict([(x[0], try_get(lambda: x[1])) for x in links_raw]) + else: + links = {} + + other_bld_levels_raw = try_get(lambda: msg[5][0][7]) + if other_bld_levels_raw: + other_bld_levels = [x[0] for x in other_bld_levels_raw] + else: + other_bld_levels = [] + + other_dates_raw = try_get(lambda: msg[5][0][8]) + if other_dates_raw: + other_dates = dict([(x[0], x[1]) for x in other_dates_raw]) + else: + other_dates = {} + return links, other_bld_levels, other_dates + + +def _parse_building_level_message(bld_level: Optional[list]) -> Optional[BuildingLevel]: + if bld_level and len(bld_level) > 1: + return BuildingLevel( + bld_level[1], + try_get(lambda: LocalizedString(*bld_level[2])), + try_get(lambda: LocalizedString(*bld_level[3]))) + return None + + +def _parse_places(places_raw: list) -> Tuple[List[Artwork], List[Place]]: + artworks = [] + places = [] + for place in places_raw: + # There are multiple types of objects that can be returned here, only way to differentiate them is the length + if len(place) == 6: + artwork = _parse_artwork(place) + artworks.append(artwork) + elif len(place) == 8: + places.append(_parse_place(place)) + return artworks, places + + +def _parse_artwork(place: dict) -> Artwork: + marker_yaw = try_get(lambda: place[1][0][0][0]) + if marker_yaw: + marker_yaw = _marker_yaw_to_rad(marker_yaw) + marker_pitch = try_get(lambda: place[1][0][0][1]) + if marker_pitch: + marker_pitch = _marker_pitch_to_rad(marker_pitch) + + if len(place[5]) > 9: + link = ArtworkLink(place[5][9][0][1], LocalizedString(*place[5][9][1])) + else: + link = None + + artwork = Artwork( + id=try_get(lambda: place[0][2][0]), + title=LocalizedString(*place[5][0]), + description=try_get(lambda: LocalizedString(*place[5][1])), + thumbnail=place[5][3], + creator=try_get(lambda: LocalizedString(*place[5][6])), + url=try_get(lambda: place[5][7][1][0]), + attributes={prop[0][0]: LocalizedString(*prop[1]) for prop in place[5][2]} if place[5][2] else {}, + marker_icon_url=place[4], + marker_yaw=marker_yaw, + marker_pitch=marker_pitch, + link=link + ) + return artwork + + +def _parse_place(place: dict) -> Place: + feature_id_parts = place[0][1] + feature_id = ':'.join(hex(int(part)) for part in feature_id_parts) + cid = try_get(lambda: int(place[0][3])) + if cid is None: + cid = int(place[0][1][1]) + + marker_yaw = try_get(lambda: place[1][0][0][0]) + if marker_yaw: + marker_yaw = _marker_yaw_to_rad(marker_yaw) + marker_pitch = try_get(lambda: place[1][0][0][1]) + if marker_pitch: + marker_pitch = _marker_pitch_to_rad(marker_pitch) + marker_distance = try_get(lambda: place[1][0][0][2]) + + return Place(feature_id=feature_id, + cid=cid, + marker_yaw=marker_yaw, + marker_pitch=marker_pitch, + marker_distance=marker_distance, + name=try_get(lambda: LocalizedString(*place[2])), + type=try_get(lambda: LocalizedString(*place[3])), + marker_icon_url=place[4], + status=BusinessStatus(place[7])) + + +def _marker_yaw_to_rad(marker_yaw: float) -> float: + return (marker_yaw - 0.5) * math.tau + + +def _marker_pitch_to_rad(marker_pitch: float) -> float: + return (0.5 - marker_pitch) * math.pi diff --git a/streetlevel/streetview/streetview.py b/streetlevel/streetview/streetview.py index 5ef9b2c..9b48260 100644 --- a/streetlevel/streetview/streetview.py +++ b/streetlevel/streetview/streetview.py @@ -3,16 +3,16 @@ import math from PIL import Image from requests import Session -from typing import List, Optional, Tuple +from typing import List, Optional from . import api -from .depth import parse as parse_depth -from .panorama import Place, BusinessStatus, StreetLabel, StreetViewPanorama, LocalizedString, CaptureDate, \ - BuildingLevel, UploadDate, Artwork, ArtworkLink +from .panorama import StreetViewPanorama +from .parse import parse_coverage_tile_response, parse_panorama_id_response, \ + parse_panorama_radius_response from .util import is_third_party_panoid -from ..dataclasses import Size, Tile, Link -from ..geo import wgs84_to_tile_coord, get_bearing -from ..util import try_get, get_equirectangular_panorama, get_equirectangular_panorama_async +from ..dataclasses import Tile +from ..geo import wgs84_to_tile_coord +from ..util import get_equirectangular_panorama, get_equirectangular_panorama_async def find_panorama(lat: float, lon: float, radius: int = 50, locale: str = "en", @@ -35,18 +35,9 @@ def find_panorama(lat: float, lon: float, radius: int = 50, locale: str = "en", # the `SingleImageSearch` call returns a different kind of depth data # than `photometa`; need to deal with that at some point - resp = api.find_panorama(lat, lon, radius=radius, download_depth=False, - locale=locale, search_third_party=search_third_party, session=session) - - response_code = resp[0][0][0] - # 0: OK - # 5: search returned no images - # don't know if there are others - if response_code != 0: - return None - - pano = _parse_pano_message(resp[0][1]) - return pano + response = api.find_panorama(lat, lon, radius=radius, download_depth=False, + locale=locale, search_third_party=search_third_party, session=session) + return parse_panorama_radius_response(response) async def find_panorama_async(lat: float, lon: float, session: ClientSession, radius: int = 50, @@ -54,18 +45,9 @@ async def find_panorama_async(lat: float, lon: float, session: ClientSession, ra # TODO # the `SingleImageSearch` call returns a different kind of depth data # than `photometa`; need to deal with that at some point - resp = await api.find_panorama_async(lat, lon, session, radius=radius, download_depth=False, - locale=locale, search_third_party=search_third_party) - - response_code = resp[0][0][0] - # 0: OK - # 5: search returned no images - # don't know if there are others - if response_code != 0: - return None - - pano = _parse_pano_message(resp[0][1]) - return pano + response = await api.find_panorama_async(lat, lon, session, radius=radius, download_depth=False, + locale=locale, search_third_party=search_third_party) + return parse_panorama_radius_response(response) def find_panorama_by_id(panoid: str, download_depth: bool = False, locale: str = "en", @@ -83,33 +65,16 @@ def find_panorama_by_id(panoid: str, download_depth: bool = False, locale: str = :param session: *(optional)* A requests session. :return: A StreetViewPanorama object if a panorama with this ID exists, or None. """ - resp = api.find_panorama_by_id(panoid, download_depth=download_depth, - locale=locale, session=session) - - response_code = resp[1][0][0][0] - # 1: OK - # 2: Not found - # don't know if there are others - if response_code != 1: - return None - - pano = _parse_pano_message(resp[1][0]) - return pano + response = api.find_panorama_by_id(panoid, download_depth=download_depth, + locale=locale, session=session) + return parse_panorama_id_response(response) async def find_panorama_by_id_async(panoid: str, session: ClientSession, download_depth: bool = False, locale: str = "en") -> Optional[StreetViewPanorama]: - resp = await api.find_panorama_by_id_async(panoid, session, download_depth=download_depth, locale=locale) - - response_code = resp[1][0][0][0] - # 1: OK - # 2: Not found - # don't know if there are others - if response_code != 1: - return None - - pano = _parse_pano_message(resp[1][0]) - return pano + response = await api.find_panorama_by_id_async(panoid, session, + download_depth=download_depth, locale=locale) + return parse_panorama_id_response(response) def get_coverage_tile(tile_x: int, tile_y: int, session: Session = None) -> List[StreetViewPanorama]: @@ -130,21 +95,13 @@ def get_coverage_tile(tile_x: int, tile_y: int, session: Session = None) -> List :param session: *(optional)* A requests session. :return: A list of StreetViewPanoramas. If no coverage was returned by the API, the list is empty. """ - resp = api.get_coverage_tile(tile_x, tile_y, session) - - if resp is None: - return [] - - return _parse_coverage_tile_response(resp) + response = api.get_coverage_tile(tile_x, tile_y, session) + return parse_coverage_tile_response(response) async def get_coverage_tile_async(tile_x: int, tile_y: int, session: ClientSession) -> List[StreetViewPanorama]: - resp = await api.get_coverage_tile_async(tile_x, tile_y, session) - - if resp is None: - return [] - - return _parse_coverage_tile_response(resp) + response = await api.get_coverage_tile_async(tile_x, tile_y, session) + return parse_coverage_tile_response(response) def get_coverage_tile_by_latlon(lat: float, lon: float, session: Session = None) -> List[StreetViewPanorama]: @@ -224,243 +181,6 @@ def _validate_get_panorama_params(pano: StreetViewPanorama, zoom: int) -> int: return zoom -def _parse_coverage_tile_response(tile: list) -> List[StreetViewPanorama]: - panos = [] - if tile[1] is not None and len(tile[1]) > 0: - for raw_pano in tile[1][1]: - if raw_pano[0][0] == 1: - continue - panos.append( - StreetViewPanorama(id=raw_pano[0][0][1], - lat=raw_pano[0][2][0][2], - lon=raw_pano[0][2][0][3], - heading=math.radians(raw_pano[0][2][2][0]), - pitch=math.radians(90 - raw_pano[0][2][2][1]), - roll=math.radians(raw_pano[0][2][2][2]), - elevation=raw_pano[0][2][1][0])) - for idx, raw_pano in enumerate(tile[1][1]): - link_indices = raw_pano[1] - panos[idx].links = [Link(panos[link_idx], - get_bearing(panos[idx].lat, panos[idx].lon, - panos[link_idx].lat, panos[link_idx].lon)) - for link_idx in link_indices] - return panos - - -def _parse_pano_message(msg: dict) -> StreetViewPanorama: - img_sizes = msg[2][3][0] - img_sizes = list(map(lambda x: Size(x[0][1], x[0][0]), img_sizes)) - others = try_get(lambda: msg[5][0][3][0]) - date = try_get(lambda: msg[6][7]) - - links, other_bld_levels, other_dates = _parse_other_pano_indices(msg) - - street_names = try_get(lambda: msg[5][0][12]) - if street_names is not None: - street_names = [_parse_street_name(street_name) for street_name in street_names] - - address = try_get(lambda: msg[3][2]) - if address is not None: - address = [LocalizedString(x[0], x[1]) for x in address] - - depth = try_get(lambda: msg[5][0][5][1][2]) - if depth: - depth = parse_depth(depth) - - upload_date = try_get(lambda: msg[6][8]) - if upload_date: - upload_date = UploadDate(*upload_date) - - places_raw = try_get(lambda: msg[5][0][9]) - if places_raw: - artworks, places = _parse_places(places_raw) - else: - artworks, places = None, None - - pano = StreetViewPanorama( - id=msg[1][1], - lat=msg[5][0][1][0][2], - lon=msg[5][0][1][0][3], - heading=try_get(lambda: math.radians(msg[5][0][1][2][0])), - pitch=try_get(lambda: math.radians(90 - msg[5][0][1][2][1])), - roll=try_get(lambda: math.radians(msg[5][0][1][2][2])), - depth=depth, - date=CaptureDate(date[0], - date[1], - date[2] if len(date) > 2 else None) if date else None, - upload_date=upload_date, - elevation=try_get(lambda: msg[5][0][1][1][0]), - tile_size=Size(msg[2][3][1][0], msg[2][3][1][1]), - image_sizes=img_sizes, - source=try_get(lambda: msg[6][5][2].lower()), - country_code=try_get(lambda: msg[5][0][1][4]), - street_names=street_names, - address=address, - copyright_message=try_get(lambda: msg[4][0][0][0][0]), - uploader=try_get(lambda: msg[4][1][0][0][0]), - uploader_icon_url=try_get(lambda: msg[4][1][0][2]), - building_level=_parse_building_level_message(try_get(lambda: msg[5][0][1][3])), - artworks=artworks, - places=places, - ) - - # parse other dates, links and neighbors - if others is not None: - for idx, other in enumerate(others): - other_id = other[0][1] - if pano.id == other_id: - continue - - connected = StreetViewPanorama( - id=other_id, - lat=try_get(lambda: float(other[2][0][2])), - lon=try_get(lambda: float(other[2][0][3])), - elevation=try_get(lambda: other[2][1][0]), - heading=try_get(lambda: math.radians(other[2][2][0])), - pitch=try_get(lambda: math.radians(90 - other[2][2][1])), - roll=try_get(lambda: math.radians(other[2][2][2])), - building_level=_parse_building_level_message(try_get(lambda: other[2][3])), - ) - - if idx in other_dates: - if other_dates[idx]: - connected.date = CaptureDate(other_dates[idx][0], other_dates[idx][1]) - pano.historical.append(connected) - else: - if idx in links: - if links[idx]: - angle = math.radians(links[idx][3]) - else: - angle = get_bearing(pano.lat, pano.lon, connected.lat, connected.lon) - pano.links.append(Link(connected, angle)) - if idx in other_bld_levels: - pano.building_levels.append(connected) - pano.neighbors.append(connected) - - other_address = try_get(lambda: other[3][2]) - if other_address: - connected.address = [LocalizedString(x[0], x[1]) for x in other_address] - pano.historical = sorted(pano.historical, - key=lambda x: (x.date.year, x.date.month) if x.date else None, - reverse=True) - - return pano - - -def _parse_street_name(msg: dict) -> StreetLabel: - # Unknown what [0][0][0][1] is, but it may be something interesting, always 2 numbers as a string - name_raw = msg[0][0][2] - name = LocalizedString(name_raw[0], name_raw[1]) - angles = [math.radians(angle) for angle in msg[1]] - return StreetLabel(name, angles) - - -def _parse_other_pano_indices(msg: dict) -> Tuple[dict, list, dict]: - links_raw = try_get(lambda: msg[5][0][6]) - if links_raw: - links = dict([(x[0], try_get(lambda: x[1])) for x in links_raw]) - else: - links = {} - - other_bld_levels_raw = try_get(lambda: msg[5][0][7]) - if other_bld_levels_raw: - other_bld_levels = [x[0] for x in other_bld_levels_raw] - else: - other_bld_levels = [] - - other_dates_raw = try_get(lambda: msg[5][0][8]) - if other_dates_raw: - other_dates = dict([(x[0], x[1]) for x in other_dates_raw]) - else: - other_dates = {} - return links, other_bld_levels, other_dates - - -def _parse_building_level_message(bld_level: Optional[list]) -> Optional[BuildingLevel]: - if bld_level and len(bld_level) > 1: - return BuildingLevel( - bld_level[1], - try_get(lambda: LocalizedString(*bld_level[2])), - try_get(lambda: LocalizedString(*bld_level[3]))) - return None - - -def _parse_places(places_raw: list) -> Tuple[List[Artwork], List[Place]]: - artworks = [] - places = [] - for place in places_raw: - # There are multiple types of objects that can be returned here, only way to differentiate them is the length - if len(place) == 6: - artwork = _parse_artwork(place) - artworks.append(artwork) - elif len(place) == 8: - places.append(_parse_place(place)) - return artworks, places - - -def _parse_artwork(place: dict) -> Artwork: - marker_yaw = try_get(lambda: place[1][0][0][0]) - if marker_yaw: - marker_yaw = _marker_yaw_to_rad(marker_yaw) - marker_pitch = try_get(lambda: place[1][0][0][1]) - if marker_pitch: - marker_pitch = _marker_pitch_to_rad(marker_pitch) - - if len(place[5]) > 9: - link = ArtworkLink(place[5][9][0][1], LocalizedString(*place[5][9][1])) - else: - link = None - - artwork = Artwork( - id=try_get(lambda: place[0][2][0]), - title=LocalizedString(*place[5][0]), - description=try_get(lambda: LocalizedString(*place[5][1])), - thumbnail=place[5][3], - creator=try_get(lambda: LocalizedString(*place[5][6])), - url=try_get(lambda: place[5][7][1][0]), - attributes={prop[0][0]: LocalizedString(*prop[1]) for prop in place[5][2]} if place[5][2] else {}, - marker_icon_url=place[4], - marker_yaw=marker_yaw, - marker_pitch=marker_pitch, - link=link - ) - return artwork - - -def _parse_place(place: dict) -> Place: - feature_id_parts = place[0][1] - feature_id = ':'.join(hex(int(part)) for part in feature_id_parts) - cid = try_get(lambda: int(place[0][3])) - if cid is None: - cid = int(place[0][1][1]) - - marker_yaw = try_get(lambda: place[1][0][0][0]) - if marker_yaw: - marker_yaw = _marker_yaw_to_rad(marker_yaw) - marker_pitch = try_get(lambda: place[1][0][0][1]) - if marker_pitch: - marker_pitch = _marker_pitch_to_rad(marker_pitch) - marker_distance = try_get(lambda: place[1][0][0][2]) - - return Place(feature_id=feature_id, - cid=cid, - marker_yaw=marker_yaw, - marker_pitch=marker_pitch, - marker_distance=marker_distance, - name=try_get(lambda: LocalizedString(*place[2])), - type=try_get(lambda: LocalizedString(*place[3])), - marker_icon_url=place[4], - status=BusinessStatus(place[7])) - - -def _marker_yaw_to_rad(marker_yaw: float) -> float: - return (marker_yaw - 0.5) * math.tau - - -def _marker_pitch_to_rad(marker_pitch: float) -> float: - return (0.5 - marker_pitch) * math.pi - - def _generate_tile_list(pano: StreetViewPanorama, zoom: int) -> List[Tile]: """ Generates a list of a panorama's tiles and the URLs pointing to them. diff --git a/streetlevel/yandex/parse.py b/streetlevel/yandex/parse.py new file mode 100644 index 0000000..37a742c --- /dev/null +++ b/streetlevel/yandex/parse.py @@ -0,0 +1,161 @@ +import math +import re +from datetime import datetime +from typing import List, Tuple, Optional + +from streetlevel.dataclasses import Size, Link +from streetlevel.util import try_get +from streetlevel.yandex.panorama import YandexPanorama, Place, Address, Marker + + +def parse_panorama_response(response: dict) -> Optional[YandexPanorama]: + if response["status"] == "error": + return None + return parse_panorama(response["data"]) + + +def parse_panorama(pano_dict: dict) -> YandexPanorama: + data = pano_dict["Data"] + annotation = pano_dict["Annotation"] + panoid = data["panoramaId"] + + addresses, other_markers = _parse_markers(annotation["Markers"]) + + return YandexPanorama( + id=panoid, + lat=float(data["Point"]["coordinates"][1]), + lon=float(data["Point"]["coordinates"][0]), + + heading=math.radians(float(data["EquirectangularProjection"]["Origin"][0])), + + image_id=data["Images"]["imageId"], + tile_size=Size(int(data["Images"]["Tiles"]["width"]), + int(data["Images"]["Tiles"]["height"])), + image_sizes=_parse_image_sizes(data["Images"]["Zooms"]), + + neighbors=_parse_neighbors(annotation["Graph"]["Nodes"], + annotation["Connections"], + panoid), + links=_parse_links(annotation["Thoroughfares"]), + historical=_parse_historical(annotation["HistoricalPanoramas"], panoid), + + date=_get_date_from_panoid(panoid), + height=int(data["Point"]["coordinates"][2]), + street_name=data["Point"]["name"], + + places=_parse_companies(annotation["Companies"]), + addresses=addresses, + other_markers=other_markers, + + author=try_get(lambda: pano_dict["Author"]["name"]), + author_avatar_url=try_get(lambda: pano_dict["Author"]["avatarUrlTemplate"]), + ) + + +def _parse_companies(companies_json: list) -> List[Place]: + companies = [] + for company in companies_json: + companies.append(Place( + id=int(company["properties"]["id"]), + lat=company["geometry"]["coordinates"][1], + lon=company["geometry"]["coordinates"][0], + name=company["properties"]["name"], + tags=company["properties"]["tags"], + )) + return companies + + +def _parse_markers(markers_json: list) -> Tuple[List[Address], List[Marker]]: + addresses = [] + other_markers = [] + for marker in markers_json: + # Address markers are displayed at a height of 7 m; + # all others, like metro icons, have a height of 2 m. + if marker["geometry"]["coordinates"][2] == 7: + addresses.append(Address( + lat=marker["geometry"]["coordinates"][1], + lon=marker["geometry"]["coordinates"][0], + house_number=marker["properties"]["name"], + street_name_and_house_number=marker["properties"]["description"], + )) + else: + other_markers.append(Marker( + lat=marker["geometry"]["coordinates"][1], + lon=marker["geometry"]["coordinates"][0], + name=marker["properties"]["name"], + description=marker["properties"]["description"], + style=marker["properties"]["style"], + )) + + return addresses, other_markers + + +def _parse_links(links_json): + links = [] + for link_json in links_json: + panoid = _get_panoid_from_url(link_json["Connection"]["href"]) + angle = math.radians(float(link_json["Direction"][0])) + links.append(Link(panoid, angle)) + return links + + +def _get_panoid_from_url(url: str) -> str: + return re.findall(r"oid=(.*?)&", url)[0] + + +def _get_date_from_panoid(panoid: str) -> datetime: + return datetime.utcfromtimestamp(int(panoid.split("_")[-1])) + + +def _parse_image_sizes(zooms: dict) -> List[Size]: + sizes = [None] * len(zooms) + for zoom in zooms: + idx = int(zoom["level"]) + sizes[idx] = Size(int(zoom["width"]), int(zoom["height"])) + return sizes + + +def _parse_neighbors(nodes: List[dict], connections: List[dict], parent_id: str) -> List[YandexPanorama]: + panos = [] + for node in nodes: + panoid = node["panoid"] + if panoid == parent_id: + continue + pano = YandexPanorama( + id=panoid, + lat=float(node["lat"]), + lon=float(node["lon"]), + date=_get_date_from_panoid(panoid), + ) + panos.append(pano) + + for connection in connections: + panoid = _get_panoid_from_url(connection["href"]) + pano = YandexPanorama( + id=panoid, + lat=connection["Point"]["coordinates"][1], + lon=connection["Point"]["coordinates"][0], + height=connection["Point"]["coordinates"][2], + date=_get_date_from_panoid(panoid), + ) + panos.append(pano) + + return panos + + +def _parse_historical(historical: List[dict], parent_id: str) -> List[YandexPanorama]: + panos = [] + for raw_pano in historical: + panoid = raw_pano["Connection"]["oid"] + if panoid == parent_id: + continue + pano = YandexPanorama( + id=panoid, + lat=float(raw_pano["Connection"]["Point"]["coordinates"][1]), + lon=float(raw_pano["Connection"]["Point"]["coordinates"][0]), + height=int(raw_pano["Connection"]["Point"]["coordinates"][2]), + date=_get_date_from_panoid(panoid) + ) + panos.append(pano) + panos = sorted(panos, key=lambda x: x.date, reverse=True) + return panos diff --git a/streetlevel/yandex/yandex.py b/streetlevel/yandex/yandex.py index 2584571..9e6825d 100644 --- a/streetlevel/yandex/yandex.py +++ b/streetlevel/yandex/yandex.py @@ -1,17 +1,16 @@ import itertools import math -from datetime import datetime -from typing import List, Optional, Tuple -import re +from typing import List, Optional from PIL import Image from aiohttp import ClientSession from requests import Session from . import api -from .panorama import YandexPanorama, Place, Address, Marker -from ..dataclasses import Size, Tile, Link -from ..util import try_get, get_equirectangular_panorama, get_equirectangular_panorama_async +from .panorama import YandexPanorama +from .parse import parse_panorama_response +from ..dataclasses import Tile +from ..util import get_equirectangular_panorama, get_equirectangular_panorama_async def find_panorama(lat: float, lon: float, session: Session = None) -> Optional[YandexPanorama]: @@ -26,23 +25,13 @@ def find_panorama(lat: float, lon: float, session: Session = None) -> Optional[Y :return: A YandexPanorama object if a panorama was found, or None. """ - resp = api.find_panorama(lat, lon, session) - - if resp["status"] == "error": - return None - - pano = _parse_panorama(resp["data"]) - return pano + response = api.find_panorama(lat, lon, session) + return parse_panorama_response(response) async def find_panorama_async(lat: float, lon: float, session: ClientSession) -> Optional[YandexPanorama]: - resp = await api.find_panorama_async(lat, lon, session) - - if resp["status"] == "error": - return None - - pano = _parse_panorama(resp["data"]) - return pano + response = await api.find_panorama_async(lat, lon, session) + return parse_panorama_response(response) def find_panorama_by_id(panoid: str, session: Session = None) -> Optional[YandexPanorama]: @@ -53,23 +42,13 @@ def find_panorama_by_id(panoid: str, session: Session = None) -> Optional[Yandex :param session: *(optional)* A requests session. :return: A YandexPanorama object if a panorama with this ID exists, or None. """ - resp = api.find_panorama_by_id(panoid, session) - - if resp["status"] == "error": - return None - - pano = _parse_panorama(resp["data"]) - return pano + response = api.find_panorama_by_id(panoid, session) + return parse_panorama_response(response) async def find_panorama_by_id_async(panoid: str, session: ClientSession) -> Optional[YandexPanorama]: - resp = await api.find_panorama_by_id_async(panoid, session) - - if resp["status"] == "error": - return None - - pano = _parse_panorama(resp["data"]) - return pano + response = await api.find_panorama_by_id_async(panoid, session) + return parse_panorama_response(response) def get_panorama(pano: YandexPanorama, zoom: int = 0) -> Image.Image: @@ -149,150 +128,3 @@ def _validate_get_panorama_params(pano: YandexPanorama, zoom: int) -> int: raise ValueError("pano.image_sizes is None.") zoom = max(0, min(zoom, len(pano.image_sizes) - 1)) return zoom - - -def _parse_panorama(pano_dict: dict) -> YandexPanorama: - data = pano_dict["Data"] - annotation = pano_dict["Annotation"] - panoid = data["panoramaId"] - - addresses, other_markers = _parse_markers(annotation["Markers"]) - - return YandexPanorama( - id=panoid, - lat=float(data["Point"]["coordinates"][1]), - lon=float(data["Point"]["coordinates"][0]), - - heading=math.radians(float(data["EquirectangularProjection"]["Origin"][0])), - - image_id=data["Images"]["imageId"], - tile_size=Size(int(data["Images"]["Tiles"]["width"]), - int(data["Images"]["Tiles"]["height"])), - image_sizes=_parse_image_sizes(data["Images"]["Zooms"]), - - neighbors=_parse_neighbors(annotation["Graph"]["Nodes"], - annotation["Connections"], - panoid), - links=_parse_links(annotation["Thoroughfares"]), - historical=_parse_historical(annotation["HistoricalPanoramas"], panoid), - - date=_get_date_from_panoid(panoid), - height=int(data["Point"]["coordinates"][2]), - street_name=data["Point"]["name"], - - places=_parse_companies(annotation["Companies"]), - addresses=addresses, - other_markers=other_markers, - - author=try_get(lambda: pano_dict["Author"]["name"]), - author_avatar_url=try_get(lambda: pano_dict["Author"]["avatarUrlTemplate"]), - ) - - -def _parse_companies(companies_json: list) -> List[Place]: - companies = [] - for company in companies_json: - companies.append(Place( - id=int(company["properties"]["id"]), - lat=company["geometry"]["coordinates"][1], - lon=company["geometry"]["coordinates"][0], - name=company["properties"]["name"], - tags=company["properties"]["tags"], - )) - return companies - - -def _parse_markers(markers_json: list) -> Tuple[List[Address], List[Marker]]: - addresses = [] - other_markers = [] - for marker in markers_json: - # Address markers are displayed at a height of 7 m; - # all others, like metro icons, have a height of 2 m. - if marker["geometry"]["coordinates"][2] == 7: - addresses.append(Address( - lat=marker["geometry"]["coordinates"][1], - lon=marker["geometry"]["coordinates"][0], - house_number=marker["properties"]["name"], - street_name_and_house_number=marker["properties"]["description"], - )) - else: - other_markers.append(Marker( - lat=marker["geometry"]["coordinates"][1], - lon=marker["geometry"]["coordinates"][0], - name=marker["properties"]["name"], - description=marker["properties"]["description"], - style=marker["properties"]["style"], - )) - - return addresses, other_markers - - -def _parse_links(links_json): - links = [] - for link_json in links_json: - panoid = _get_panoid_from_url(link_json["Connection"]["href"]) - angle = math.radians(float(link_json["Direction"][0])) - links.append(Link(panoid, angle)) - return links - - -def _get_panoid_from_url(url: str) -> str: - return re.findall(r"oid=(.*?)&", url)[0] - - -def _get_date_from_panoid(panoid: str) -> datetime: - return datetime.utcfromtimestamp(int(panoid.split("_")[-1])) - - -def _parse_image_sizes(zooms: dict) -> List[Size]: - sizes = [None] * len(zooms) - for zoom in zooms: - idx = int(zoom["level"]) - sizes[idx] = Size(int(zoom["width"]), int(zoom["height"])) - return sizes - - -def _parse_neighbors(nodes: List[dict], connections: List[dict], parent_id: str) -> List[YandexPanorama]: - panos = [] - for node in nodes: - panoid = node["panoid"] - if panoid == parent_id: - continue - pano = YandexPanorama( - id=panoid, - lat=float(node["lat"]), - lon=float(node["lon"]), - date=_get_date_from_panoid(panoid), - ) - panos.append(pano) - - for connection in connections: - panoid = _get_panoid_from_url(connection["href"]) - pano = YandexPanorama( - id=panoid, - lat=connection["Point"]["coordinates"][1], - lon=connection["Point"]["coordinates"][0], - height=connection["Point"]["coordinates"][2], - date=_get_date_from_panoid(panoid), - ) - panos.append(pano) - - return panos - - -def _parse_historical(historical: List[dict], parent_id: str) -> List[YandexPanorama]: - panos = [] - for raw_pano in historical: - panoid = raw_pano["Connection"]["oid"] - if panoid == parent_id: - continue - pano = YandexPanorama( - id=panoid, - lat=float(raw_pano["Connection"]["Point"]["coordinates"][1]), - lon=float(raw_pano["Connection"]["Point"]["coordinates"][0]), - height=int(raw_pano["Connection"]["Point"]["coordinates"][2]), - date=_get_date_from_panoid(panoid) - ) - panos.append(pano) - panos = sorted(panos, key=lambda x: x.date, reverse=True) - return panos diff --git a/tests/lookaround/lookaround_test.py b/tests/lookaround/lookaround_test.py index e43e536..fcf38b8 100644 --- a/tests/lookaround/lookaround_test.py +++ b/tests/lookaround/lookaround_test.py @@ -3,22 +3,21 @@ from streetlevel import lookaround from streetlevel.lookaround.proto import GroundMetadataTile_pb2 +from streetlevel.lookaround.panorama import CoverageType +import streetlevel.lookaround.parse -def test_get_coverage_tile(): - def mocked_get_coverage_tile(tile_x, tile_y, session=None): - with open("lookaround/data/metadata_tile.pb", "rb") as f: - tile_pb = f.read() - tile = GroundMetadataTile_pb2.GroundMetadataTile() - tile.ParseFromString(tile_pb) - return tile - lookaround.api.get_coverage_tile = mocked_get_coverage_tile +def test_parse_coverage_tile(): + with open("lookaround/data/metadata_tile.pb", "rb") as f: + tile_pb = f.read() + tile = GroundMetadataTile_pb2.GroundMetadataTile() + tile.ParseFromString(tile_pb) - panos = lookaround.get_coverage_tile_by_latlon(37.793871595174096, -122.43653154373168) + panos = lookaround.parse.parse_coverage_tile(tile) assert panos[0].id == 8227292017329697463 assert panos[0].build_id == 1392282981 assert panos[0].date == datetime(2022, 4, 2, 22, 14, 21, 111000) - assert panos[0].coverage_type == lookaround.CoverageType.CAR + assert panos[0].coverage_type == CoverageType.CAR assert panos[0].heading == approx(1.7832050655066374) assert panos[0].pitch == approx(-0.016837476195331824) assert panos[0].roll == approx(-0.030182552495642945) diff --git a/tests/mapy/mapy_test.py b/tests/mapy/mapy_test.py index e72c875..a2b0ecf 100644 --- a/tests/mapy/mapy_test.py +++ b/tests/mapy/mapy_test.py @@ -1,20 +1,17 @@ import pytest from pytest import approx import pickle + from streetlevel import mapy +import streetlevel.mapy.parse from streetlevel.dataclasses import Size -def mocked_getbest(lat, lon, radius, options=None): +def test_parse_getbest_response(): with open("mapy/data/getbest.pkl", "rb") as f: - return pickle.load(f) - - -mapy.api.getbest = mocked_getbest - + response = pickle.load(f) -def test_find_panorama(): - pano = mapy.find_panorama(50.1265193, 17.3762701, 100.0, historical=False, links=False) + pano = mapy.parse.parse_getbest_response(response) assert pano.id == 59418543 assert pano.lat == approx(50.1265193, 0.001) assert pano.lon == approx(17.3762701, 0.001) diff --git a/tests/streetside/streetside_test.py b/tests/streetside/streetside_test.py index e1c6619..395442c 100644 --- a/tests/streetside/streetside_test.py +++ b/tests/streetside/streetside_test.py @@ -1,18 +1,15 @@ from pytest import approx import json + from streetlevel import streetside +import streetlevel.streetside.parse -def mocked_api_find_panoramas(north, west, south, east, limit=50, session=None): +def test_parse_panoramas(): with open("streetside/data/find.json", "r") as f: - return json.load(f) - - -streetside.api.find_panoramas = mocked_api_find_panoramas - + response = json.load(f) -def test_find_panoramas_in_bbox(): - panos = streetside.find_panoramas_in_bbox(-23.860792, 35.343169, -23.863089, 35.347470) + panos = streetside.parse.parse_panoramas(response) assert len(panos) != 0 assert panos[0].id == 362530254 assert panos[0].lat == approx(-23.862083, 0.001) diff --git a/tests/streetview/streetview_test.py b/tests/streetview/streetview_test.py index f9b9a90..e4534ef 100644 --- a/tests/streetview/streetview_test.py +++ b/tests/streetview/streetview_test.py @@ -2,23 +2,20 @@ import json from streetlevel import streetview from streetlevel.dataclasses import Size +import streetlevel.streetview.parse -def test_is_third_party_panoid(request): +def test_is_third_party_panoid(): assert not streetview.is_third_party_panoid("n-Zd6bDDL_XOc_jkNgFsGg") assert streetview.is_third_party_panoid("AF1QipN3bwjvnpTUbfCZ18wsUMrpZ6Ul2mhVfNKl71_X") def test_find_panorama_by_id(): - def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", session=None): - with open("streetview/data/find_by_id.json", "r") as f: - return json.load(f) - - streetview.api.find_panorama_by_id = mocked_api_find_panorama_by_id + with open("streetview/data/find_by_id.json", "r") as f: + response = json.load(f) - panoid = "n-Zd6bDDL_XOc_jkNgFsGg" - pano = streetview.find_panorama_by_id(panoid, download_depth=False, session=None) - assert pano.id == panoid + pano = streetview.parse.parse_panorama_id_response(response) + assert pano.id == "n-Zd6bDDL_XOc_jkNgFsGg" assert pano.lat == approx(47.15048822721601, 0.001) assert pano.lon == approx(11.13385612403307, 0.001) assert pano.date.month == 3 @@ -30,52 +27,28 @@ def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", se def test_find_panorama(): - def mocked_api_find_panorama(lat, lon, radius=50, download_depth=False, locale="en", - search_third_party=False, session=None): - with open("streetview/data/find.json", "r") as f: - return json.load(f) + with open("streetview/data/find.json", "r") as f: + response = json.load(f) - streetview.api.find_panorama = mocked_api_find_panorama - - pano = streetview.find_panorama(47.15048822721601, 11.13385612403307, radius=100, session=None) - assert pano.id == "n-Zd6bDDL_XOc_jkNgFsGg" - assert pano.lat == approx(47.15048822721601, 0.001) - assert pano.lon == approx(11.13385612403307, 0.001) - - -async def test_find_panorama_async(): - async def mocked_api_find_panorama_async(lat, lon, session, radius=50, download_depth=False, locale="en", - search_third_party=False): - with open("streetview/data/find.json", "r") as f: - return json.load(f) - - streetview.api.find_panorama_async = mocked_api_find_panorama_async - - pano = await streetview.find_panorama_async(47.15048822721601, 11.13385612403307, None, radius=100) + pano = streetview.parse.parse_panorama_radius_response(response) assert pano.id == "n-Zd6bDDL_XOc_jkNgFsGg" assert pano.lat == approx(47.15048822721601, 0.001) assert pano.lon == approx(11.13385612403307, 0.001) def test_find_get_coverage_tile_by_latlon(): - def mocked_api_get_coverage_tile(tile_x, tile_y, session=None): - with open("streetview/data/coverage_tile.json", "r") as f: - return json.load(f) + with open("streetview/data/coverage_tile.json", "r") as f: + response = json.load(f) - streetview.api.get_coverage_tile = mocked_api_get_coverage_tile - - panos = streetview.get_coverage_tile_by_latlon(47.15048822721601, 11.13385612403307) + panos = streetview.parse.parse_coverage_tile_response(response) assert any(p.id == "n-Zd6bDDL_XOc_jkNgFsGg" for p in panos) def test_nepal_links(): - def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", session=None): - with open("streetview/data/nepal_links.json", "r") as f: - return json.load(f) - - streetview.api.find_panorama_by_id = mocked_api_find_panorama_by_id + with open("streetview/data/nepal_links.json", "r") as f: + response = json.load(f) - pano = streetview.find_panorama_by_id("75oFcYmZUOnqgvAaX2q-_w") + pano = streetview.parse.parse_panorama_id_response(response) assert pano.links[0].pano.id == "yCIEQ7o37R49IxNPOkhWgw" assert pano.links[0].pano.lat is None assert pano.links[1].pano.id == "C9aepLhvgmFjRJlP2GxjXQ" @@ -83,24 +56,18 @@ def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", se def test_missing_link_direction(): - def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", session=None): - with open("streetview/data/missing_link_direction.json", "r") as f: - return json.load(f) + with open("streetview/data/missing_link_direction.json", "r") as f: + response = json.load(f) - streetview.api.find_panorama_by_id = mocked_api_find_panorama_by_id - - pano = streetview.find_panorama_by_id("VAhJEVyAlZg-QgAnUwcIRA") + pano = streetview.parse.parse_panorama_id_response(response) assert pano.links[0].direction == approx(0.07641416505750565) def test_places(): - def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", session=None): - with open("streetview/data/places.json", "r") as f: - return json.load(f) - - streetview.api.find_panorama_by_id = mocked_api_find_panorama_by_id + with open("streetview/data/places.json", "r") as f: + response = json.load(f) - pano = streetview.find_panorama_by_id("gjDG9WfyVFri9OT0A4LaWw") + pano = streetview.parse.parse_panorama_id_response(response) assert len(pano.places) == 3 assert pano.places[2].name.value == "Old Parliament House" assert pano.places[2].type.value == "Museum" @@ -112,13 +79,10 @@ def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", se def test_missing_level_name(): - def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", session=None): - with open("streetview/data/missing_level_name.json", "r") as f: - return json.load(f) + with open("streetview/data/missing_level_name.json", "r") as f: + response = json.load(f) - streetview.api.find_panorama_by_id = mocked_api_find_panorama_by_id - - pano = streetview.find_panorama_by_id("4pRBISc-WOW0Qw8kB8mC3Q") + pano = streetview.parse.parse_panorama_id_response(response) assert pano.building_level is not None assert pano.building_level.level == 0 assert pano.building_level.name is None @@ -126,24 +90,18 @@ def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", se def test_missing_date(): - def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", session=None): - with open("streetview/data/missing_date.json", "r") as f: - return json.load(f) - - streetview.api.find_panorama_by_id = mocked_api_find_panorama_by_id + with open("streetview/data/missing_date.json", "r") as f: + response = json.load(f) - pano = streetview.find_panorama_by_id("_RqEb7FskACC8WVKWHQ66w") + pano = streetview.parse.parse_panorama_id_response(response) assert pano.date is None def test_missing_historical_date(): - def mocked_api_find_panorama_by_id(panoid, download_depth=False, locale="en", session=None): - with open("streetview/data/missing_historical_date.json", "r") as f: - return json.load(f) - - streetview.api.find_panorama_by_id = mocked_api_find_panorama_by_id + with open("streetview/data/missing_historical_date.json", "r") as f: + response = json.load(f) - pano = streetview.find_panorama_by_id("_Bhrz-OIcZ8AAAQ4hFiG7A") + pano = streetview.parse.parse_panorama_id_response(response) assert pano.date.year == 2017 assert pano.date.month == 7 assert len(pano.historical) == 1 diff --git a/tests/yandex/yandex_test.py b/tests/yandex/yandex_test.py index 245f649..52b4290 100644 --- a/tests/yandex/yandex_test.py +++ b/tests/yandex/yandex_test.py @@ -1,23 +1,17 @@ from datetime import datetime - -from pytest import approx import json -from requests import Session +from pytest import approx from streetlevel import yandex +import streetlevel.yandex.parse -def mocked_api_find_panorama(lat: float, lon: float, session: Session = None) -> dict: +def test_parse_panorama_response(): with open("yandex/data/find.json", "r") as f: - return json.load(f) - - -yandex.api.find_panorama = mocked_api_find_panorama - + response = json.load(f) -def test_find_panorama(): - pano = yandex.find_panorama(53.917633, 27.548128) + pano = yandex.parse.parse_panorama_response(response) assert pano.id == "1238072810_692204477_23_1688105969" assert pano.lat == approx(53.917633, 0.001) assert pano.lon == approx(27.548128, 0.001) From dee0c2bfabcbb36b98874f357e1e1411f89a12c9 Mon Sep 17 00:00:00 2001 From: sk-zk Date: Mon, 29 Apr 2024 16:18:38 +0200 Subject: [PATCH 3/4] [Naver] Fix `parse_historical` and update `parse_nearby` `metadata/timeline` now also contains the panorama type, breaking `parse_historical`. Likewise, `panorama/nearby` returns the panorama type as well (though I don't know for certain whether this is new), so `parse_nearby` has been updated as well. --- streetlevel/naver/parse.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streetlevel/naver/parse.py b/streetlevel/naver/parse.py index b0ede3b..ce3ceae 100644 --- a/streetlevel/naver/parse.py +++ b/streetlevel/naver/parse.py @@ -64,7 +64,8 @@ def parse_historical(response: dict, parent_id: str) -> List[NaverPanorama]: id=pano[0], lat=pano[2], lon=pano[1], - date=datetime.strptime(pano[3], "%Y-%m-%d %H:%M:%S.0") + panorama_type=PanoramaType(int(pano[3])), + date=datetime.strptime(pano[4], "%Y-%m-%d %H:%M:%S.0"), ) for pano in panos if pano[0] != parent_id] @@ -80,7 +81,8 @@ def parse_nearby(response: dict) -> NaverPanorama: description=feature["properties"]["description"], title=feature["properties"]["title"], elevation=elevation, - camera_height=(feature["properties"]["camera_altitude"] * 0.01) - elevation + camera_height=(feature["properties"]["camera_altitude"] * 0.01) - elevation, + panorama_type=PanoramaType(int(feature["properties"]["type"])), ) From 1ad33e9903331445cd3359540cd21e85e3c3f883 Mon Sep 17 00:00:00 2001 From: sk-zk Date: Mon, 29 Apr 2024 16:39:20 +0200 Subject: [PATCH 4/4] Increase version --- docs/conf.py | 2 +- pyproject.toml | 2 +- streetlevel/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 6d5f631..460fc52 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -9,7 +9,7 @@ project = 'streetlevel' copyright = '2024, skzk' author = 'skzk' -release = '0.8.0' +release = '0.8.1' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/pyproject.toml b/pyproject.toml index 49346c8..d6d3e06 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "streetlevel" -version = "0.8.0" +version = "0.8.1" license = "MIT" authors = [ { name="sk-zk", email="sk-zk@users.noreply.github.com" }, diff --git a/streetlevel/__init__.py b/streetlevel/__init__.py index 777f190..8088f75 100644 --- a/streetlevel/__init__.py +++ b/streetlevel/__init__.py @@ -1 +1 @@ -__version__ = "0.8.0" +__version__ = "0.8.1"