Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
sk-zk committed Apr 29, 2024
2 parents f40fd47 + 1ad33e9 commit 16d5ed1
Show file tree
Hide file tree
Showing 26 changed files with 1,012 additions and 1,056 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# streetlevel
**streetlevel** is a library for downloading panoramas and metadata from street-level imagery services including Google Street View, Apple Look Around, and several others.
**streetlevel** is a library for downloading panoramas and metadata from street-level imagery services
such as Google Street View, Apple Look Around, and several others. It provides a high-level abstraction
over the internal APIs of the supported services – this means that no API keys are required, but the
library may break unexpectedly.

Since it relies on calls to internal APIs, it may break unexpectedly.

(Nearly) all functions are available as both a sync function using `requests` or an async function using `aiohttp`, requiring a `ClientSession`.
(Nearly) all functions are available as either a sync function using `requests` or an async function
using `aiohttp`, requiring a `ClientSession`.

## Installation
```sh
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
project = 'streetlevel'
copyright = '2024, skzk'
author = 'skzk'
release = '0.8.0'
release = '0.8.1'

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "streetlevel"
version = "0.8.0"
version = "0.8.1"
license = "MIT"
authors = [
{ name="sk-zk", email="sk-zk@users.noreply.github.com" },
Expand Down
2 changes: 1 addition & 1 deletion streetlevel/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.0"
__version__ = "0.8.1"
93 changes: 15 additions & 78 deletions streetlevel/ja/ja.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import math
from typing import Optional, List, Tuple, Union

from PIL import Image
from aiohttp import ClientSession
from requests import Session

from . import api
from .panorama import JaPanorama, CaptureDate, Address, StreetLabel
from .panorama import JaPanorama
from .parse import parse_panorama_radius_response, parse_panorama_id_response
from ..dataclasses import Tile
from ..util import download_tiles, download_tiles_async, CubemapStitchingMethod, stitch_cubemap_faces, \
save_cubemap_panorama, stitch_cubemap_face, try_get
save_cubemap_panorama, stitch_cubemap_face


def find_panorama(lat: float, lon: float, radius: int = 100, session: Session = None) -> Optional[JaPanorama]:
Expand All @@ -23,21 +23,13 @@ def find_panorama(lat: float, lon: float, radius: int = 100, session: Session =
:return: A JaPanorama if a panorama was found, or None.
"""
response = api.find_panorama(lat, lon, radius, session)

if "message" in response:
return None

return _parse_panorama(response)
return parse_panorama_radius_response(response)


async def find_panorama_async(lat: float, lon: float, session: ClientSession,
radius: int = 100) -> Optional[JaPanorama]:
radius: int = 100) -> Optional[JaPanorama]:
response = await api.find_panorama_async(lat, lon, session, radius)

if "message" in response:
return None

return _parse_panorama(response)
return parse_panorama_radius_response(response)


def find_panorama_by_id(panoid: int, session: Session = None) -> Optional[JaPanorama]:
Expand All @@ -49,12 +41,12 @@ def find_panorama_by_id(panoid: int, session: Session = None) -> Optional[JaPano
:return: A JaPanorama object if a panorama with this ID was found, or None.
"""
response = api.find_panorama_by_id(panoid, session)
return _parse_panorama_by_id(response)
return parse_panorama_id_response(response)


async def find_panorama_by_id_async(panoid: int, session: ClientSession) -> Optional[JaPanorama]:
response = await api.find_panorama_by_id_async(panoid, session)
return _parse_panorama_by_id(response)
return parse_panorama_id_response(response)


def get_panorama(pano: JaPanorama, zoom: int = 0,
Expand All @@ -69,7 +61,7 @@ def get_panorama(pano: JaPanorama, zoom: int = 0,
image. Defaults to ``ROW``.
:return: A PIL image or a list of six PIL images depending on ``stitching_method``.
"""
zoom = min(1, max(zoom, 0))
zoom = _validate_get_panorama_params(pano, zoom)
face_tiles, cols, rows = _generate_tile_list(pano, zoom)
tile_images = _download_tiles(face_tiles)
return _stitch_panorama(tile_images, cols, rows, stitching_method=stitching_method)
Expand All @@ -78,7 +70,7 @@ def get_panorama(pano: JaPanorama, zoom: int = 0,
async def get_panorama_async(pano: JaPanorama, session: ClientSession, zoom: int = 0,
stitching_method: CubemapStitchingMethod = CubemapStitchingMethod.ROW) \
-> Union[List[Image.Image], Image.Image]:
zoom = min(1, max(zoom, 0))
zoom = _validate_get_panorama_params(pano, zoom)
face_tiles, cols, rows = _generate_tile_list(pano, zoom)
tile_images = await _download_tiles_async(face_tiles, session)
return _stitch_panorama(tile_images, cols, rows, stitching_method=stitching_method)
Expand Down Expand Up @@ -116,66 +108,11 @@ async def download_panorama_async(pano: JaPanorama, path: str, session: ClientSe
save_cubemap_panorama(output, path, pil_args)


def _parse_panorama_by_id(pano_dict: dict) -> JaPanorama:
address = try_get(lambda: pano_dict["streets"]["nearestAddress"])
if address:
address = Address(*address.values())

return JaPanorama(
id=pano_dict["image"]["id"],
lat=pano_dict["image"]["lat"],
lon=pano_dict["image"]["lng"],
heading=math.radians(pano_dict["image"]["heading"]),
date=_parse_date(pano_dict["image"]["month"]),
pano_url="https:" + pano_dict["image"]["pano_url"],
blur_key=pano_dict["image"]["blur_key"],
street_names=_parse_streets(pano_dict["streets"]),
address=address,
neighbors=_parse_hotspots(pano_dict["hotspots"]),
)


def _parse_streets(streets: dict) -> List[StreetLabel]:
main = StreetLabel(name=streets["street"]["name"],
angles=[math.radians(a) for a in streets["street"]["azimuths"]])
connections = []
for connection_dict in streets["connections"]:
connection = StreetLabel(name=connection_dict["name"],
angles=[math.radians(connection_dict["angle"])],
distance=connection_dict["distance"])
connections.append(connection)

return [main] + connections


def _parse_hotspots(hotspots: list) -> List[JaPanorama]:
neighbors = []
for hotspot in hotspots:
neighbors.append(JaPanorama(
id=hotspot["image"]["id"],
lat=hotspot["image"]["lat"],
lon=hotspot["image"]["lng"],
heading=math.radians(hotspot["image"]["heading"]),
date=_parse_date(hotspot["image"]["month"]),
pano_url="https:" + hotspot["image"]["pano_url"],
blur_key=hotspot["image"]["blur_key"],
))
return neighbors


def _parse_date(date_str: str) -> CaptureDate:
year, month = date_str.split("-")
date = CaptureDate(int(year), int(month))
return date


def _parse_panorama(pano_dict: dict) -> JaPanorama:
return JaPanorama(
id=pano_dict["id"],
lat=pano_dict["lat"],
lon=pano_dict["lng"],
heading=math.radians(pano_dict["image_heading"]),
)
def _validate_get_panorama_params(pano, zoom):
if not pano.pano_url:
raise ValueError("pano_url is None; please call find_panorama_by_id to fetch this info")
zoom = min(1, max(zoom, 0))
return zoom


def _generate_tile_list(pano: JaPanorama, zoom: int) -> Tuple[List[List[Tile]], int, int]:
Expand Down
71 changes: 71 additions & 0 deletions streetlevel/ja/parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import math
from typing import List, Optional

from streetlevel.ja.panorama import JaPanorama, Address, StreetLabel, CaptureDate
from streetlevel.util import try_get


def parse_panorama_radius_response(pano_dict: dict) -> Optional[JaPanorama]:
if "message" in pano_dict:
return None

return JaPanorama(
id=pano_dict["id"],
lat=pano_dict["lat"],
lon=pano_dict["lng"],
heading=math.radians(pano_dict["image_heading"]),
)


def parse_panorama_id_response(pano_dict: dict) -> JaPanorama:
address = try_get(lambda: pano_dict["streets"]["nearestAddress"])
if address:
address = Address(*address.values())

return JaPanorama(
id=pano_dict["image"]["id"],
lat=pano_dict["image"]["lat"],
lon=pano_dict["image"]["lng"],
heading=math.radians(pano_dict["image"]["heading"]),
date=_parse_date(pano_dict["image"]["month"]),
pano_url="https:" + pano_dict["image"]["pano_url"],
blur_key=pano_dict["image"]["blur_key"],
street_names=_parse_streets(pano_dict["streets"]),
address=address,
neighbors=_parse_hotspots(pano_dict["hotspots"]),
)


def _parse_streets(streets: dict) -> List[StreetLabel]:
main = StreetLabel(name=streets["street"]["name"],
angles=[math.radians(a) for a in streets["street"]["azimuths"]])
connections = []
for connection_dict in streets["connections"]:
connection = StreetLabel(name=connection_dict["name"],
angles=[math.radians(connection_dict["angle"])],
distance=connection_dict["distance"])
connections.append(connection)

return [main] + connections


def _parse_hotspots(hotspots: list) -> List[JaPanorama]:
neighbors = []
for hotspot in hotspots:
neighbors.append(JaPanorama(
id=hotspot["image"]["id"],
lat=hotspot["image"]["lat"],
lon=hotspot["image"]["lng"],
heading=math.radians(hotspot["image"]["heading"]),
date=_parse_date(hotspot["image"]["month"]),
pano_url="https:" + hotspot["image"]["pano_url"],
blur_key=hotspot["image"]["blur_key"],
))
return neighbors


def _parse_date(date_str: str) -> CaptureDate:
year, month = date_str.split("-")
date = CaptureDate(int(year), int(month))
return date

61 changes: 8 additions & 53 deletions streetlevel/kakao/kakao.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import itertools
import math
from typing import List, Optional
from datetime import datetime

import requests
from PIL import Image
from aiohttp import ClientSession
from requests import Session

from . import api
from .panorama import KakaoPanorama, PanoramaType
from ..dataclasses import Tile, Size, Link
from ..util import try_get, get_equirectangular_panorama, get_equirectangular_panorama_async, get_image, \
from .panorama import KakaoPanorama
from .parse import parse_panoramas, parse_panorama
from ..dataclasses import Tile, Size
from ..util import get_equirectangular_panorama, get_equirectangular_panorama_async, get_image, \
get_image_async, download_file, download_file_async

PANO_COLS = [1, 8, 16]
Expand Down Expand Up @@ -43,7 +42,7 @@ def find_panoramas(lat: float, lon: float, radius: int = 35,
if response["street_view"]["cnt"] == 0:
return []

return _parse_panoramas(response)
return parse_panoramas(response)


async def find_panoramas_async(lat: float, lon: float, session: ClientSession,
Expand All @@ -53,7 +52,7 @@ async def find_panoramas_async(lat: float, lon: float, session: ClientSession,
if response["street_view"]["cnt"] == 0:
return []

return _parse_panoramas(response)
return parse_panoramas(response)


def find_panorama_by_id(panoid: int, neighbors: bool = True, session: Session = None) -> Optional[KakaoPanorama]:
Expand All @@ -74,7 +73,7 @@ def find_panorama_by_id(panoid: int, neighbors: bool = True, session: Session =
if response["street_view"]["cnt"] == 0:
return None

pano = _parse_panorama(response["street_view"]["street"])
pano = parse_panorama(response["street_view"]["street"])
if neighbors:
pano.neighbors = find_panoramas(pano.lat, pano.lon, session=session)
return pano
Expand All @@ -87,7 +86,7 @@ async def find_panorama_by_id_async(panoid: int, session: ClientSession,
if response["street_view"]["cnt"] == 0:
return None

pano = _parse_panorama(response["street_view"]["street"])
pano = parse_panorama(response["street_view"]["street"])
if neighbors:
pano.neighbors = await find_panoramas_async(pano.lat, pano.lon, session)
return pano
Expand Down Expand Up @@ -187,50 +186,6 @@ def _build_depthmap_url(pano):
f"{pano.image_path}_W.png"


def _parse_panoramas(response):
return [_parse_panorama(pano) for pano in response["street_view"]["streetList"]]


def _parse_panorama(pano_json: dict) -> KakaoPanorama:
pano = KakaoPanorama(
id=pano_json["id"],
lat=pano_json["wgsy"],
lon=pano_json["wgsx"],
wcongx=pano_json["wcongx"],
wcongy=pano_json["wcongy"],
heading=math.radians(float(pano_json["angle"])),
image_path=pano_json["img_path"],
# shot_date sometimes returns the time as 00:00:00, but the image url is always correct
date=datetime.strptime(pano_json["img_path"].split("_")[-1], "%Y%m%d%H%M%S"),
street_name=try_get(lambda: pano_json["st_name"]),
address=try_get(lambda: pano_json["addr"]),
street_type=try_get(lambda: pano_json["st_type"]),
panorama_type=PanoramaType(int(pano_json["shot_tool"]))
)

if "past" in pano_json and pano_json["past"] is not None:
pano.historical = [_parse_panorama(past) for past in pano_json["past"]]

if "spot" in pano_json and pano_json["past"] is not None:
pano.links = _parse_links(pano_json["spot"])

return pano


def _parse_links(links_json: List[dict]) -> List[Link]:
links = []
for linked_json in links_json:
linked = KakaoPanorama(
id=linked_json["id"],
lat=linked_json["wgsy"],
lon=linked_json["wgsx"],
street_name=try_get(lambda: linked_json["st_name"]),
)
angle = math.radians(float(linked_json["pan"]))
links.append(Link(linked, angle))
return links


def _generate_tile_list(pano: KakaoPanorama, zoom: int) -> List[Tile]:
"""
Generates a list of a panorama's tiles and the URLs pointing to them.
Expand Down
Loading

0 comments on commit 16d5ed1

Please sign in to comment.