| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,274 @@ | ||
| """ | ||
| OpenStreetMaps geocoder, contributed by Alessandro Pasotti of ItOpen. | ||
| """ | ||
|
|
||
| from geopy.geocoders.base import ( | ||
| Geocoder, | ||
| DEFAULT_FORMAT_STRING, | ||
| DEFAULT_TIMEOUT, | ||
| DEFAULT_SCHEME | ||
| ) | ||
| from geopy.compat import urlencode | ||
| from geopy.location import Location | ||
| from geopy.util import logger | ||
| from geopy.exc import GeocoderQueryError | ||
|
|
||
|
|
||
| __all__ = ("Nominatim", ) | ||
|
|
||
|
|
||
| class Nominatim(Geocoder): | ||
| """ | ||
| Nominatim geocoder for OpenStreetMap servers. Documentation at: | ||
| https://wiki.openstreetmap.org/wiki/Nominatim | ||
| Note that Nominatim does not support SSL. | ||
| """ | ||
|
|
||
| structured_query_params = { | ||
| 'street', | ||
| 'city', | ||
| 'county', | ||
| 'state', | ||
| 'country', | ||
| 'postalcode', | ||
| } | ||
|
|
||
| def __init__( | ||
| self, | ||
| format_string=DEFAULT_FORMAT_STRING, | ||
| view_box=None, | ||
| country_bias=None, | ||
| timeout=DEFAULT_TIMEOUT, | ||
| proxies=None, | ||
| domain='nominatim.openstreetmap.org', | ||
| scheme=DEFAULT_SCHEME, | ||
| user_agent=None | ||
| ): # pylint: disable=R0913 | ||
| """ | ||
| :param string format_string: String containing '%s' where the | ||
| string to geocode should be interpolated before querying the | ||
| geocoder. For example: '%s, Mountain View, CA'. The default | ||
| is just '%s'. | ||
| :param tuple view_box: Coordinates to restrict search within. | ||
| :param string country_bias: Bias results to this country. | ||
| :param dict proxies: If specified, routes this geocoder's requests | ||
| through the specified proxy. E.g., {"https": "192.0.2.0"}. For | ||
| more information, see documentation on | ||
| :class:`urllib2.ProxyHandler`. | ||
| .. versionadded:: 0.96 | ||
| :param string domain: Should be the localized Openstreetmap domain to | ||
| connect to. The default is 'nominatim.openstreetmap.org', but you | ||
| can change it to a domain of your own. | ||
| .. versionadded:: 1.8.2 | ||
| :param string scheme: Use 'https' or 'http' as the API URL's scheme. | ||
| Default is https. Note that SSL connections' certificates are not | ||
| verified. | ||
| .. versionadded:: 1.8.2 | ||
| """ | ||
| super(Nominatim, self).__init__( | ||
| format_string, scheme, timeout, proxies, user_agent=user_agent | ||
| ) | ||
| self.country_bias = country_bias | ||
| self.format_string = format_string | ||
| self.view_box = view_box | ||
| self.domain = domain.strip('/') | ||
|
|
||
| self.api = "%s://%s/search" % (self.scheme, self.domain) | ||
| self.reverse_api = "%s://%s/reverse" % (self.scheme, self.domain) | ||
|
|
||
| def geocode( | ||
| self, | ||
| query, | ||
| exactly_one=True, | ||
| timeout=None, | ||
| addressdetails=False, | ||
| language=False, | ||
| geometry=None | ||
| ): # pylint: disable=R0913,W0221 | ||
| """ | ||
| Geocode a location query. | ||
| :param query: The address, query or structured query to geocode | ||
| you wish to geocode. | ||
| For a structured query, provide a dictionary whose keys | ||
| are one of: `street`, `city`, `county`, `state`, `country`, or | ||
| `postalcode`. For more information, see Nominatim's | ||
| documentation for "structured requests": | ||
| https://wiki.openstreetmap.org/wiki/Nominatim | ||
| :type query: dict or string | ||
| .. versionchanged:: 1.0.0 | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. Set this only if you wish to override, on this call | ||
| only, the value set during the geocoder's initialization. | ||
| .. versionadded:: 0.97 | ||
| :param addressdetails: If you want in *Location.raw* to include | ||
| addressdetails such as city_district, etc set it to True | ||
| :type addressdetails: bool | ||
| :param string language: Preferred language in which to return results. | ||
| Either uses standard | ||
| `RFC2616 <http://www.ietf.org/rfc/rfc2616.txt>`_ | ||
| accept-language string or a simple comma-separated | ||
| list of language codes. | ||
| :type addressdetails: string | ||
| .. versionadded:: 1.0.0 | ||
| :param string geometry: If present, specifies whether the geocoding | ||
| service should return the result's geometry in `wkt`, `svg`, | ||
| `kml`, or `geojson` formats. This is available via the | ||
| `raw` attribute on the returned :class:`geopy.location.Location` | ||
| object. | ||
| .. versionadded:: 1.3.0 | ||
| """ | ||
|
|
||
| if isinstance(query, dict): | ||
| params = { | ||
| key: val | ||
| for key, val | ||
| in query.items() | ||
| if key in self.structured_query_params | ||
| } | ||
| else: | ||
| params = {'q': self.format_string % query} | ||
|
|
||
| params.update({ | ||
| 'format': 'json' | ||
| }) | ||
|
|
||
| # `viewbox` apparently replaces `view_box` | ||
| if self.view_box: | ||
| params['viewbox'] = ','.join(self.view_box) | ||
|
|
||
| if self.country_bias: | ||
| params['countrycodes'] = self.country_bias | ||
|
|
||
| if addressdetails: | ||
| params['addressdetails'] = 1 | ||
|
|
||
| if language: | ||
| params['accept-language'] = language | ||
|
|
||
| if geometry is not None: | ||
| geometry = geometry.lower() | ||
| if geometry == 'wkt': | ||
| params['polygon_text'] = 1 | ||
| elif geometry == 'svg': | ||
| params['polygon_svg'] = 1 | ||
| elif geometry == 'kml': | ||
| params['polygon_kml'] = 1 | ||
| elif geometry == 'geojson': | ||
| params['polygon_geojson'] = 1 | ||
| else: | ||
| raise GeocoderQueryError( | ||
| "Invalid geometry format. Must be one of: " | ||
| "wkt, svg, kml, geojson." | ||
| ) | ||
|
|
||
| url = "?".join((self.api, urlencode(params))) | ||
| logger.debug("%s.geocode: %s", self.__class__.__name__, url) | ||
| return self._parse_json( | ||
| self._call_geocoder(url, timeout=timeout), exactly_one | ||
| ) | ||
|
|
||
| def reverse( | ||
| self, | ||
| query, | ||
| exactly_one=True, | ||
| timeout=None, | ||
| language=False, | ||
| ): # pylint: disable=W0221 | ||
| """ | ||
| Returns a reverse geocoded location. | ||
| :param query: The coordinates for which you wish to obtain the | ||
| closest human-readable addresses. | ||
| :type query: :class:`geopy.point.Point`, list or tuple of (latitude, | ||
| longitude), or string as "%(latitude)s, %(longitude)s" | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. Set this only if you wish to override, on this call | ||
| only, the value set during the geocoder's initialization. | ||
| .. versionadded:: 0.97 | ||
| :param string language: Preferred language in which to return results. | ||
| Either uses standard | ||
| `RFC2616 <http://www.ietf.org/rfc/rfc2616.txt>`_ | ||
| accept-language string or a simple comma-separated | ||
| list of language codes. | ||
| :type addressdetails: string | ||
| .. versionadded:: 1.0.0 | ||
| """ | ||
| try: | ||
| lat, lon = [ | ||
| x.strip() for x in | ||
| self._coerce_point_to_string(query).split(',') | ||
| ] # doh | ||
| except ValueError: | ||
| raise ValueError("Must be a coordinate pair or Point") | ||
| params = { | ||
| 'lat': lat, | ||
| 'lon': lon, | ||
| 'format': 'json', | ||
| } | ||
| if language: | ||
| params['accept-language'] = language | ||
| url = "?".join((self.reverse_api, urlencode(params))) | ||
| logger.debug("%s.reverse: %s", self.__class__.__name__, url) | ||
| return self._parse_json( | ||
| self._call_geocoder(url, timeout=timeout), exactly_one | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def parse_code(place): | ||
| """ | ||
| Parse each resource. | ||
| """ | ||
| latitude = place.get('lat', None) | ||
| longitude = place.get('lon', None) | ||
| placename = place.get('display_name', None) | ||
| if latitude and longitude: | ||
| latitude = float(latitude) | ||
| longitude = float(longitude) | ||
| return Location(placename, (latitude, longitude), place) | ||
|
|
||
| def _parse_json(self, places, exactly_one): | ||
| if places is None: | ||
| return None | ||
| if not isinstance(places, list): | ||
| places = [places] | ||
| if not len(places): | ||
| return None | ||
| if exactly_one is True: | ||
| return self.parse_code(places[0]) | ||
| else: | ||
| return [self.parse_code(place) for place in places] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,225 @@ | ||
| """ | ||
| :class:`.Photon` geocoder. | ||
| """ | ||
|
|
||
| from geopy.compat import urlencode, string_compare | ||
| from geopy.geocoders.base import ( | ||
| Geocoder, | ||
| DEFAULT_FORMAT_STRING, | ||
| DEFAULT_TIMEOUT, | ||
| DEFAULT_SCHEME | ||
| ) | ||
| from geopy.location import Location | ||
| from geopy.util import logger | ||
|
|
||
|
|
||
| __all__ = ("Photon", ) | ||
|
|
||
|
|
||
| class Photon(Geocoder): # pylint: disable=W0223 | ||
| """ | ||
| Geocoder using Photon geocoding service (data based on OpenStreetMap and | ||
| service provided by Komoot on https://photon.komoot.de). | ||
| Documentation at https://github.com/komoot/photon | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| format_string=DEFAULT_FORMAT_STRING, | ||
| scheme=DEFAULT_SCHEME, | ||
| timeout=DEFAULT_TIMEOUT, | ||
| proxies=None, | ||
| domain='photon.komoot.de' | ||
| ): # pylint: disable=R0913 | ||
| """ | ||
| Initialize a Photon/Komoot geocoder which aims to let you "search as | ||
| you type with OpenStreetMap". No API Key is needed by this platform. | ||
| :param string format_string: String containing '%s' where | ||
| the string to geocode should be interpolated before querying | ||
| the geocoder. For example: '%s, Mountain View, CA'. The default | ||
| is just '%s'. | ||
| :param string scheme: Use 'https' or 'http' as the API URL's scheme. | ||
| Default is https. Note that SSL connections' certificates are not | ||
| verified. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. | ||
| :param dict proxies: If specified, routes this geocoder's requests | ||
| through the specified proxy. E.g., {"https": "192.0.2.0"}. For | ||
| more information, see documentation on | ||
| :class:`urllib2.ProxyHandler`. | ||
| :param string domain: Should be the localized Photon domain to | ||
| connect to. The default is 'photon.komoot.de', but you | ||
| can change it to a domain of your own. | ||
| """ | ||
| super(Photon, self).__init__( | ||
| format_string, scheme, timeout, proxies | ||
| ) | ||
| self.domain = domain.strip('/') | ||
| self.api = "%s://%s/api" % (self.scheme, self.domain) | ||
| self.reverse_api = "%s://%s/reverse" % (self.scheme, self.domain) | ||
|
|
||
| def geocode( | ||
| self, | ||
| query, | ||
| exactly_one=True, | ||
| timeout=None, | ||
| location_bias=None, | ||
| language=False, | ||
| osm_tag=None | ||
| ): # pylint: disable=W0221 | ||
| """ | ||
| Geocode a location query. | ||
| :param string query: The address or query you wish to geocode. | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. Set this only if you wish to override, on this call | ||
| only, the value set during the geocoder's initialization. | ||
| :param location_bias: The coordinates to used as location bias. | ||
| :type query: :class:`geopy.point.Point`, list or tuple of (latitude, | ||
| longitude), or string as "%(latitude)s, %(longitude)s" | ||
| :param string language: Preferred language in which to return results. | ||
| :param osm_tag: The expression to filter (include/exclude) by key and/ | ||
| or value, str as 'key:value' or list/set of str if multiple filters | ||
| are requiered as ['key:!val', '!key', ':!value'] | ||
| """ | ||
| params = { | ||
| 'q': self.format_string % query | ||
| } | ||
| if exactly_one: | ||
| params['limit'] = 1 | ||
| if language: | ||
| params['lang'] = language | ||
| if location_bias: | ||
| try: | ||
| lat, lon = [x.strip() for x | ||
| in self._coerce_point_to_string(location_bias) | ||
| .split(',')] | ||
| params['lon'] = lon | ||
| params['lat'] = lat | ||
| except ValueError: | ||
| raise ValueError(("Location bias must be a" | ||
| " coordinate pair or Point")) | ||
| if osm_tag: | ||
| if isinstance(osm_tag, string_compare): | ||
| params['osm_tag'] = osm_tag | ||
| else: | ||
| try: | ||
| params['osm_tag'] = '&osm_tag='.join(osm_tag) | ||
| except ValueError: | ||
| raise ValueError( | ||
| "osm_tag must be a string expression or " | ||
| "a set/list of string expressions" | ||
| ) | ||
| url = "?".join((self.api, urlencode(params))) | ||
|
|
||
| logger.debug("%s.geocode: %s", self.__class__.__name__, url) | ||
| return self._parse_json( | ||
| self._call_geocoder(url, timeout=timeout), | ||
| exactly_one | ||
| ) | ||
|
|
||
| def reverse( | ||
| self, | ||
| query, | ||
| exactly_one=True, | ||
| timeout=None, | ||
| language=False, | ||
| osm_tag=None | ||
| ): # pylint: disable=W0221 | ||
| """ | ||
| Returns a reverse geocoded location. | ||
| :param query: The coordinates for which you wish to obtain the | ||
| closest human-readable addresses. | ||
| :type query: :class:`geopy.point.Point`, list or tuple of (latitude, | ||
| longitude), or string as "%(latitude)s, %(longitude)s" | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. Set this only if you wish to override, on this call | ||
| only, the value set during the geocoder's initialization. | ||
| :param string language: Preferred language in which to return results. | ||
| :param osm_tag: The expression to filter (include/exclude) by key and/ | ||
| or value, str as 'key:value' or list/set of str if multiple filters | ||
| are requiered as ['key:!val', '!key', ':!value'] | ||
| """ | ||
| try: | ||
| lat, lon = [x.strip() for x in | ||
| self._coerce_point_to_string(query).split(',')] | ||
| except ValueError: | ||
| raise ValueError("Must be a coordinate pair or Point") | ||
| params = { | ||
| 'lat': lat, | ||
| 'lon': lon, | ||
| } | ||
| if exactly_one: | ||
| params['limit'] = 1 | ||
| if language: | ||
| params['lang'] = language | ||
| if osm_tag: | ||
| if isinstance(osm_tag, string_compare): | ||
| params['osm_tag'] = osm_tag | ||
| else: | ||
| try: | ||
| params['osm_tag'] = '&osm_tag='.join(osm_tag) | ||
| except ValueError: | ||
| raise ValueError(("osm_tag must be a string expression or " | ||
| "a set/list of string expressions")) | ||
| url = "?".join((self.reverse_api, urlencode(params))) | ||
| logger.debug("%s.reverse: %s", self.__class__.__name__, url) | ||
| return self._parse_json( | ||
| self._call_geocoder(url, timeout=timeout), exactly_one | ||
| ) | ||
|
|
||
| @classmethod | ||
| def _parse_json(cls, resources, exactly_one=True): | ||
| """ | ||
| Parse display name, latitude, and longitude from a JSON response. | ||
| """ | ||
| if not len(resources): # pragma: no cover | ||
| return None | ||
| if exactly_one: | ||
| return cls.parse_resource(resources['features'][0]) | ||
| else: | ||
| return [cls.parse_resource(resource) for resource | ||
| in resources['features']] | ||
|
|
||
| @classmethod | ||
| def parse_resource(cls, resource): | ||
| """ | ||
| Return location and coordinates tuple from dict. | ||
| """ | ||
| name_elements = ['name', 'housenumber', 'street', | ||
| 'postcode', 'street', 'city', | ||
| 'state', 'country'] | ||
| name = [resource.get(k) for k | ||
| in name_elements if resource.get(k)] | ||
| location = ', '.join(name) | ||
|
|
||
| latitude = resource['geometry']['coordinates'][1] or None | ||
| longitude = resource['geometry']['coordinates'][0] or None | ||
| if latitude and longitude: | ||
| latitude = float(latitude) | ||
| longitude = float(longitude) | ||
|
|
||
| return Location(location, (latitude, longitude), resource) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,218 @@ | ||
| """ | ||
| :class:`.YahooPlaceFinder` geocoder. | ||
| """ | ||
|
|
||
| from functools import partial | ||
|
|
||
| try: | ||
| from requests import get, Request | ||
| from requests_oauthlib import OAuth1 | ||
| requests_missing = False | ||
| except ImportError: | ||
| requests_missing = True | ||
|
|
||
| from geopy.geocoders.base import Geocoder, DEFAULT_TIMEOUT | ||
| from geopy.exc import GeocoderParseError | ||
| from geopy.location import Location | ||
| from geopy.compat import string_compare, py3k | ||
|
|
||
|
|
||
| __all__ = ("YahooPlaceFinder", ) | ||
|
|
||
|
|
||
| class YahooPlaceFinder(Geocoder): # pylint: disable=W0223 | ||
| """ | ||
| Geocoder that utilizes the Yahoo! BOSS PlaceFinder API. Documentation at: | ||
| https://developer.yahoo.com/boss/geo/docs/ | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| consumer_key, | ||
| consumer_secret, | ||
| timeout=DEFAULT_TIMEOUT, | ||
| proxies=None, | ||
| user_agent=None, | ||
| ): # pylint: disable=R0913 | ||
| """ | ||
| :param string consumer_key: Key provided by Yahoo. | ||
| :param string consumer_secret: Secret corresponding to the key | ||
| provided by Yahoo. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. | ||
| :param dict proxies: If specified, routes this geocoder"s requests | ||
| through the specified proxy. E.g., {"https": "192.0.2.0"}. For | ||
| more information, see documentation on | ||
| :class:`urllib2.ProxyHandler`. | ||
| .. versionadded:: 0.96 | ||
| """ | ||
| if requests_missing: | ||
| raise ImportError( | ||
| 'requests-oauthlib is needed for YahooPlaceFinder.' | ||
| ' Install with `pip install geopy -e ".[placefinder]"`.' | ||
| ) | ||
| super(YahooPlaceFinder, self).__init__( | ||
| timeout=timeout, proxies=proxies, user_agent=user_agent | ||
| ) | ||
| self.consumer_key = ( | ||
| unicode(consumer_key) | ||
| if not py3k | ||
| else str(consumer_key) | ||
| ) | ||
| self.consumer_secret = ( | ||
| unicode(consumer_secret) | ||
| if not py3k | ||
| else str(consumer_secret) | ||
| ) | ||
| self.auth = OAuth1( | ||
| client_key=self.consumer_key, | ||
| client_secret=self.consumer_secret, | ||
| signature_method="HMAC-SHA1", | ||
| signature_type="AUTH_HEADER", | ||
| ) | ||
| self.api = "https://yboss.yahooapis.com/geo/placefinder" | ||
|
|
||
| @staticmethod | ||
| def _filtered_results(results, min_quality, valid_country_codes): | ||
| """ | ||
| Returns only the results that meet the minimum quality threshold | ||
| and are located in expected countries. | ||
| """ | ||
| if min_quality: | ||
| results = [ | ||
| loc | ||
| for loc in results | ||
| if int(loc.raw["quality"]) > min_quality | ||
| ] | ||
|
|
||
| if valid_country_codes: | ||
| results = [ | ||
| loc | ||
| for loc in results | ||
| if loc.raw["countrycode"] in valid_country_codes | ||
| ] | ||
|
|
||
| return results | ||
|
|
||
| def _parse_response(self, content): | ||
| """ | ||
| Returns the parsed result of a PlaceFinder API call. | ||
| """ | ||
| try: | ||
| placefinder = ( | ||
| content["bossresponse"]["placefinder"] | ||
| ) | ||
| if not len(placefinder) or not len(placefinder.get("results", [])): | ||
| return None | ||
| results = [ | ||
| Location( | ||
| self.humanize(place), | ||
| (float(place["latitude"]), float(place["longitude"])), | ||
| raw=place | ||
| ) | ||
| for place in placefinder["results"] | ||
| ] | ||
| except (KeyError, ValueError): | ||
| raise GeocoderParseError("Error parsing PlaceFinder result") | ||
|
|
||
| return results | ||
|
|
||
| @staticmethod | ||
| def humanize(location): | ||
| """ | ||
| Returns a human readable representation of a raw PlaceFinder location | ||
| """ | ||
| return ", ".join([ | ||
| location[line] | ||
| for line in ["line1", "line2", "line3", "line4"] | ||
| if location[line] | ||
| ]) | ||
|
|
||
| def geocode( | ||
| self, | ||
| query, | ||
| exactly_one=True, | ||
| timeout=None, | ||
| min_quality=0, | ||
| reverse=False, | ||
| valid_country_codes=None, | ||
| with_timezone=False, | ||
| ): # pylint: disable=W0221,R0913 | ||
| """ | ||
| Geocode a location query. | ||
| :param string query: The address or query you wish to geocode. | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| :param int min_quality: | ||
| :param bool reverse: | ||
| :param valid_country_codes: | ||
| :type valid_country_codes: list or tuple | ||
| :param bool with_timezone: Include the timezone in the response's | ||
| `raw` dictionary (as `timezone`). | ||
| """ | ||
| params = { | ||
| "location": query, | ||
| "flags": "J", # JSON | ||
| } | ||
|
|
||
| if reverse is True: | ||
| params["gflags"] = "R" | ||
| if exactly_one is True: | ||
| params["count"] = "1" | ||
| if with_timezone is True: | ||
| params['flags'] += 'T' #Return timezone | ||
|
|
||
| response = self._call_geocoder( | ||
| self.api, | ||
| timeout=timeout, | ||
| requester=get, | ||
| params=params, | ||
| auth=self.auth, | ||
| ) | ||
| results = self._parse_response(response) | ||
| if results is None: | ||
| return None | ||
|
|
||
| results = self._filtered_results( | ||
| results, | ||
| min_quality, | ||
| valid_country_codes, | ||
| ) | ||
|
|
||
| if exactly_one: | ||
| return results[0] | ||
| else: | ||
| return results | ||
|
|
||
| def reverse(self, query, exactly_one=True, timeout=None): | ||
| """ | ||
| Returns a reverse geocoded location using Yahoo"s PlaceFinder API. | ||
| :param query: The coordinates for which you wish to obtain the | ||
| closest human-readable addresses. | ||
| :type query: :class:`geopy.point.Point`, list or tuple of (latitude, | ||
| longitude), or string as "%(latitude)s, %(longitude)s" | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| """ | ||
| query = self._coerce_point_to_string(query) | ||
| if isinstance(query, string_compare): | ||
| query = query.replace(" ", "") # oauth signature failure; todo | ||
| return self.geocode( | ||
| query, | ||
| exactly_one=exactly_one, | ||
| timeout=timeout, | ||
| reverse=True | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| """ | ||
| :class:`.LiveAddress` geocoder. | ||
| """ | ||
|
|
||
| from geopy.geocoders.base import Geocoder, DEFAULT_TIMEOUT, DEFAULT_SCHEME | ||
| from geopy.compat import urlencode | ||
| from geopy.location import Location | ||
| from geopy.exc import ConfigurationError, GeocoderQuotaExceeded | ||
| from geopy.util import logger | ||
|
|
||
|
|
||
| __all__ = ("LiveAddress", ) | ||
|
|
||
|
|
||
| class LiveAddress(Geocoder): # pylint: disable=W0223 | ||
| """ | ||
| Initialize a customized LiveAddress geocoder provided by SmartyStreets. | ||
| More information regarding the LiveAddress API can be found here: | ||
| https://smartystreets.com/products/liveaddress-api | ||
| """ | ||
| def __init__( | ||
| self, | ||
| auth_id, | ||
| auth_token, | ||
| candidates=1, | ||
| scheme=DEFAULT_SCHEME, | ||
| timeout=DEFAULT_TIMEOUT, | ||
| proxies=None, | ||
| user_agent=None, | ||
| ): # pylint: disable=R0913 | ||
| """ | ||
| Initialize a customized SmartyStreets LiveAddress geocoder. | ||
| :param string auth_id: Valid `Auth ID` from SmartyStreets. | ||
| .. versionadded:: 1.5.0 | ||
| :param string auth_token: Valid `Auth Token` from SmartyStreets. | ||
| :param int candidates: An integer between 1 and 10 indicating the max | ||
| number of candidate addresses to return if a valid address | ||
| could be found. | ||
| :param string scheme: Use 'https' or 'http' as the API URL's scheme. | ||
| Default is https. Note that SSL connections' certificates are not | ||
| verified. | ||
| .. versionadded:: 0.97 | ||
| .. versionchanged:: 1.8.0 | ||
| LiveAddress now requires `https`. Specifying `scheme=http` will | ||
| result in a :class:`geopy.exc.ConfigurationError`. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising an :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. | ||
| .. versionadded:: 0.97 | ||
| :param dict proxies: If specified, routes this geocoder's requests | ||
| through the specified proxy. E.g., {"https": "192.0.2.0"}. For | ||
| more information, see documentation on | ||
| :class:`urllib2.ProxyHandler`. | ||
| .. versionadded:: 0.96 | ||
| """ | ||
| super(LiveAddress, self).__init__( | ||
| timeout=timeout, proxies=proxies, user_agent=user_agent | ||
| ) | ||
| if scheme == "http": | ||
| raise ConfigurationError("LiveAddress now requires `https`.") | ||
| self.scheme = scheme | ||
| self.auth_id = auth_id | ||
| self.auth_token = auth_token | ||
| if candidates: | ||
| if not 1 <= candidates <= 10: | ||
| raise ValueError('candidates must be between 1 and 10') | ||
| self.candidates = candidates | ||
| self.api = '%s://api.smartystreets.com/street-address' % self.scheme | ||
|
|
||
| def geocode(self, query, exactly_one=True, timeout=None): # pylint: disable=W0221 | ||
| """ | ||
| Geocode a location query. | ||
| :param string query: The address or query you wish to geocode. | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| """ | ||
| url = self._compose_url(query) | ||
| logger.debug("%s.geocode: %s", self.__class__.__name__, url) | ||
| return self._parse_json(self._call_geocoder(url, timeout=timeout), | ||
| exactly_one) | ||
|
|
||
| def _geocoder_exception_handler(self, error, message): # pylint: disable=R0201,W0613 | ||
| """ | ||
| LiveStreets-specific exceptions. | ||
| """ | ||
| if "no active subscriptions found" in message.lower(): | ||
| raise GeocoderQuotaExceeded(message) | ||
|
|
||
| def _compose_url(self, location): | ||
| """ | ||
| Generate API URL. | ||
| """ | ||
| query = { | ||
| 'auth-id': self.auth_id, | ||
| 'auth-token': self.auth_token, | ||
| 'street': location, | ||
| 'candidates': self.candidates | ||
| } | ||
| return '{url}?{query}'.format(url=self.api, query=urlencode(query)) | ||
|
|
||
| def _parse_json(self, response, exactly_one=True): | ||
| """ | ||
| Parse responses as JSON objects. | ||
| """ | ||
| if not len(response): | ||
| return None | ||
| if exactly_one is True: | ||
| return self._format_structured_address(response[0]) | ||
| else: | ||
| return [self._format_structured_address(c) for c in response] | ||
|
|
||
| @staticmethod | ||
| def _format_structured_address(address): | ||
| """ | ||
| Pretty-print address and return lat, lon tuple. | ||
| """ | ||
| latitude = address['metadata'].get('latitude') | ||
| longitude = address['metadata'].get('longitude') | ||
| return Location( | ||
| ", ".join((address['delivery_line_1'], address['last_line'])), | ||
| (latitude, longitude) if latitude and longitude else None, | ||
| address | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,272 @@ | ||
| """ | ||
| :class:`.What3Words` geocoder. | ||
| """ | ||
|
|
||
| import re | ||
| from geopy.compat import urlencode | ||
| from geopy.geocoders.base import ( | ||
| Geocoder, | ||
| DEFAULT_FORMAT_STRING, | ||
| DEFAULT_TIMEOUT, | ||
| DEFAULT_SCHEME | ||
| ) | ||
| from geopy.location import Location | ||
| from geopy.util import logger, join_filter | ||
| from geopy import exc | ||
|
|
||
|
|
||
| __all__ = ("What3Words", ) | ||
|
|
||
|
|
||
| class What3Words(Geocoder): | ||
| """ | ||
| What3Words geocoder, documentation at: | ||
| http://what3words.com/api/reference | ||
| """ | ||
|
|
||
| word_re = re.compile(r"^\*{1,1}[^\W\d\_]+$", re.U) | ||
| multiple_word_re = re.compile( | ||
| r"[^\W\d\_]+\.{1,1}[^\W\d\_]+\.{1,1}[^\W\d\_]+$", re.U | ||
| ) | ||
|
|
||
| def __init__( | ||
| self, | ||
| api_key, | ||
| format_string=DEFAULT_FORMAT_STRING, | ||
| scheme=DEFAULT_SCHEME, | ||
| timeout=DEFAULT_TIMEOUT, | ||
| proxies=None, | ||
| user_agent=None, | ||
| ): | ||
| """ | ||
| Initialize a What3Words geocoder with 3-word or OneWord-address and | ||
| What3Words API key. | ||
| .. versionadded:: 1.5.0 | ||
| :param string api_key: Key provided by What3Words. | ||
| :param string format_string: String containing '%s' where the | ||
| string to geocode should be interpolated before querying the | ||
| geocoder. For example: '%s, piped.gains.jungle'. The default | ||
| is just '%s'. | ||
| :param string scheme: Use 'https' or 'http' as the API URL's scheme. | ||
| Default is https. Note that SSL connections' certificates are not | ||
| verified. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. | ||
| :param dict proxies: If specified, routes this geocoder's requests | ||
| through the specified proxy. E.g., {"https": "192.0.2.0"}. For | ||
| more information, see documentation on | ||
| :class:`urllib2.ProxyHandler`. | ||
| """ | ||
| super(What3Words, self).__init__( | ||
| format_string, | ||
| scheme, | ||
| timeout, | ||
| proxies, | ||
| user_agent=user_agent, | ||
| ) | ||
| self.api_key = api_key | ||
| self.api = ( | ||
| "%s://api.what3words.com/" % self.scheme | ||
| ) | ||
|
|
||
| def _check_query(self, query): | ||
| """ | ||
| Check query validity with regex | ||
| """ | ||
| if not (self.word_re.match(query) or | ||
| self.multiple_word_re.match(query)): | ||
| return False | ||
| else: | ||
| return True | ||
|
|
||
| def geocode(self, | ||
| query, | ||
| lang='en', | ||
| exactly_one=True, | ||
| timeout=None): | ||
|
|
||
| """ | ||
| Geocode a "3 words" or "OneWord" query. | ||
| :param string query: The 3-word or OneWord-address you wish to geocode. | ||
| :param string lang: two character language codes as supported by | ||
| the API (http://what3words.com/api/reference/languages). | ||
| :param bool exactly_one: Parameter has no effect for this geocoder. | ||
| Due to the address scheme there is always exactly one result. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. Set this only if you wish to override, on this call | ||
| only, the value set during the geocoder's initialization. | ||
| .. versionadded:: 0.97 | ||
| """ | ||
|
|
||
| if not self._check_query(query): | ||
| raise exc.GeocoderQueryError( | ||
| "Search string must be either like " | ||
| "'word.word.word' or '*word' " | ||
| ) | ||
|
|
||
| params = { | ||
| 'string': self.format_string % query, | ||
| 'lang': self.format_string % lang.lower() | ||
|
|
||
| } | ||
|
|
||
| url = "?".join(( | ||
| (self.api + "w3w"), | ||
| "&".join(("=".join(('key', self.api_key)), urlencode(params))) | ||
| )) | ||
| logger.debug("%s.geocode: %s", self.__class__.__name__, url) | ||
| return self._parse_json( | ||
| self._call_geocoder(url, timeout=timeout), | ||
| exactly_one | ||
| ) | ||
|
|
||
| def _parse_json(self, resources, exactly_one=True): | ||
| """ | ||
| Parse type, words, latitude, and longitude and language from a | ||
| JSON response. | ||
| """ | ||
| if resources.get('error') == "X1": | ||
| raise exc.GeocoderAuthenticationFailure() | ||
|
|
||
| if resources.get('error') == "11": | ||
| raise exc.GeocoderQueryError( | ||
| "Address (Word(s)) not recognised by What3Words." | ||
| ) | ||
|
|
||
| def parse_resource(resource): | ||
| """ | ||
| Parse record. | ||
| """ | ||
|
|
||
| if resource['type'] == '3 words': | ||
| words = resource['words'] | ||
| words = join_filter(".", [words[0], words[1], words[2]]) | ||
| position = resource['position'] | ||
| latitude, longitude = position[0], position[1] | ||
|
|
||
| if latitude and longitude: | ||
| latitude = float(latitude) | ||
| longitude = float(longitude) | ||
|
|
||
| return Location(words, (latitude, longitude), resource) | ||
| elif resource['type'] == 'OneWord': | ||
| words = resource['words'] | ||
| words = join_filter(".", [words[0], words[1], words[2]]) | ||
| oneword = resource['oneword'] | ||
| info = resource['info'] | ||
|
|
||
| address = join_filter(", ", [ | ||
| oneword, | ||
| words, | ||
| info['name'], | ||
| info['address1'], | ||
| info['address2'], | ||
| info['address3'], | ||
| info['city'], | ||
| info['county'], | ||
| info['postcode'], | ||
| info['country_id'] | ||
| ]) | ||
|
|
||
| position = resource['position'] | ||
| latitude, longitude = position[0], position[1] | ||
|
|
||
| if latitude and longitude: | ||
| latitude = float(latitude) | ||
| longitude = float(longitude) | ||
|
|
||
| return Location(address, (latitude, longitude), resource) | ||
| else: | ||
| raise exc.GeocoderParseError('Error parsing result.') | ||
|
|
||
|
|
||
| return parse_resource(resources) | ||
|
|
||
|
|
||
| def reverse(self, query, lang='en', exactly_one=True, timeout=None): | ||
| """ | ||
| Given a point, find the 3 word address. | ||
| :param query: The coordinates for which you wish to obtain the 3 word | ||
| address. | ||
| :type query: :class:`geopy.point.Point`, list or tuple of (latitude, | ||
| longitude), or string as "%(latitude)s, %(longitude)s" | ||
| :param string lang: two character language codes as supported by the | ||
| API (http://what3words.com/api/reference/languages). | ||
| :param bool exactly_one: Parameter has no effect for this geocoder. | ||
| Due to the address scheme there is always exactly one result. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. Set this only if you wish to override, on this call | ||
| only, the value set during the geocoder's initialization. | ||
| """ | ||
| lang = lang.lower() | ||
|
|
||
| params = { | ||
| 'position': self._coerce_point_to_string(query), | ||
| 'lang': self.format_string % lang | ||
|
|
||
| } | ||
|
|
||
| url = "?".join(( | ||
| (self.api + "position"), | ||
| "&".join(("=".join(('key', self.api_key)), urlencode(params))) | ||
| )) | ||
|
|
||
| logger.debug("%s.reverse: %s", self.__class__.__name__, url) | ||
| return self._parse_reverse_json( | ||
| self._call_geocoder(url, timeout=timeout), | ||
| ) | ||
|
|
||
|
|
||
| @staticmethod | ||
| def _parse_reverse_json(resources): | ||
| """ | ||
| Parses a location from a single-result reverse API call. | ||
| """ | ||
|
|
||
| if resources.get('error') == "21": | ||
| raise exc.GeocoderQueryError("Invalid coordinates") | ||
|
|
||
| def parse_resource(resource): | ||
| """ | ||
| Parse resource to return Geopy Location object | ||
| """ | ||
| words = resource['words'] | ||
| words = join_filter(".", [words[0], words[1], words[2]]) | ||
| position = resource['position'] | ||
| latitude, longitude = position[0], position[1] | ||
|
|
||
| if latitude and longitude: | ||
| latitude = float(latitude) | ||
| longitude = float(longitude) | ||
|
|
||
| return Location(words, (latitude, longitude), resource) | ||
|
|
||
| return parse_resource(resources) | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| """ | ||
| :class:`Yandex` geocoder. | ||
| """ | ||
|
|
||
| from geopy.compat import urlencode | ||
|
|
||
| from geopy.geocoders.base import Geocoder, DEFAULT_TIMEOUT | ||
| from geopy.location import Location | ||
| from geopy.exc import ( | ||
| GeocoderServiceError, | ||
| GeocoderParseError | ||
| ) | ||
| from geopy.util import logger | ||
|
|
||
|
|
||
| __all__ = ("Yandex", ) | ||
|
|
||
|
|
||
| class Yandex(Geocoder): # pylint: disable=W0223 | ||
| """ | ||
| Yandex geocoder, documentation at: | ||
| http://api.yandex.com/maps/doc/geocoder/desc/concepts/input_params.xml | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| api_key=None, | ||
| lang=None, | ||
| timeout=DEFAULT_TIMEOUT, | ||
| proxies=None, | ||
| user_agent=None, | ||
| ): | ||
| """ | ||
| Create a Yandex-based geocoder. | ||
| .. versionadded:: 1.5.0 | ||
| :param string api_key: Yandex API key (not obligatory) | ||
| http://api.yandex.ru/maps/form.xml | ||
| :param string lang: response locale, the following locales are | ||
| supported: "ru_RU" (default), "uk_UA", "be_BY", "en_US", "tr_TR" | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. | ||
| :param dict proxies: If specified, routes this geocoder's requests | ||
| through the specified proxy. E.g., {"https": "192.0.2.0"}. For | ||
| more information, see documentation on | ||
| :class:`urllib2.ProxyHandler`. | ||
| """ | ||
| super(Yandex, self).__init__( | ||
| scheme='http', timeout=timeout, proxies=proxies, user_agent=user_agent | ||
| ) | ||
| self.api_key = api_key | ||
| self.lang = lang | ||
| self.api = 'http://geocode-maps.yandex.ru/1.x/' | ||
|
|
||
| def geocode(self, query, exactly_one=True, timeout=None): # pylint: disable=W0221 | ||
| """ | ||
| Geocode a location query. | ||
| :param string query: The address or query you wish to geocode. | ||
| :param bool exactly_one: Return one result or a list of results, if | ||
| available. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. Set this only if you wish to override, on this call | ||
| only, the value set during the geocoder's initialization. | ||
| """ | ||
| params = { | ||
| 'geocode': query, | ||
| 'format': 'json' | ||
| } | ||
| if not self.api_key is None: | ||
| params['key'] = self.api_key | ||
| if not self.lang is None: | ||
| params['lang'] = self.lang | ||
| if exactly_one is True: | ||
| params['results'] = 1 | ||
| url = "?".join((self.api, urlencode(params))) | ||
| logger.debug("%s.geocode: %s", self.__class__.__name__, url) | ||
| return self._parse_json( | ||
| self._call_geocoder(url, timeout=timeout), | ||
| exactly_one, | ||
| ) | ||
|
|
||
| def reverse( | ||
| self, | ||
| query, | ||
| exactly_one=False, | ||
| timeout=None, | ||
| ): | ||
| """ | ||
| Given a point, find an address. | ||
| :param string query: The coordinates for which you wish to obtain the | ||
| closest human-readable addresses. | ||
| :type query: :class:`geopy.point.Point`, list or tuple of (latitude, | ||
| longitude), or string as "%(latitude)s, %(longitude)s" | ||
| :param boolean exactly_one: Return one result or a list of results, if | ||
| available. | ||
| :param int timeout: Time, in seconds, to wait for the geocoding service | ||
| to respond before raising a :class:`geopy.exc.GeocoderTimedOut` | ||
| exception. | ||
| """ | ||
| try: | ||
| lat, lng = [ | ||
| x.strip() for x in | ||
| self._coerce_point_to_string(query).split(',') | ||
| ] | ||
| except ValueError: | ||
| raise ValueError("Must be a coordinate pair or Point") | ||
| params = { | ||
| 'geocode': '{0},{1}'.format(lng, lat), | ||
| 'format': 'json' | ||
| } | ||
| if self.api_key is not None: | ||
| params['key'] = self.api_key | ||
| if self.lang is not None: | ||
| params['lang'] = self.lang | ||
| url = "?".join((self.api, urlencode(params))) | ||
| logger.debug("%s.reverse: %s", self.__class__.__name__, url) | ||
| return self._parse_json( | ||
| self._call_geocoder(url, timeout=timeout), | ||
| exactly_one | ||
| ) | ||
|
|
||
| def _parse_json(self, doc, exactly_one): | ||
| """ | ||
| Parse JSON response body. | ||
| """ | ||
| if doc.get('error'): | ||
| raise GeocoderServiceError(doc['error']['message']) | ||
|
|
||
| try: | ||
| places = doc['response']['GeoObjectCollection']['featureMember'] | ||
| except KeyError: | ||
| raise GeocoderParseError('Failed to parse server response') | ||
|
|
||
| def parse_code(place): | ||
| """ | ||
| Parse each record. | ||
| """ | ||
| try: | ||
| place = place['GeoObject'] | ||
| except KeyError: | ||
| raise GeocoderParseError('Failed to parse server response') | ||
|
|
||
| longitude, latitude = [ | ||
| float(_) for _ in place['Point']['pos'].split(' ') | ||
| ] | ||
|
|
||
| location = place.get('description') | ||
|
|
||
| return Location(location, (latitude, longitude), place) | ||
|
|
||
| if exactly_one: | ||
| try: | ||
| return parse_code(places[0]) | ||
| except IndexError: | ||
| return None | ||
| else: | ||
| return [parse_code(place) for place in places] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| """ | ||
| :class:`.Location` returns geocoder results. | ||
| """ | ||
|
|
||
| from geopy.point import Point | ||
| from geopy.compat import string_compare, py3k | ||
|
|
||
|
|
||
| class Location(object): # pylint: disable=R0903,R0921 | ||
| """ | ||
| Contains a parsed geocoder response. Can be iterated over as | ||
| (location<String>, (latitude<float>, longitude<Float)). Or one can access | ||
| the properties `address`, `latitude`, `longitude`, or `raw`. The last | ||
| is a dictionary of the geocoder's response for this item. | ||
| .. versionadded:: 0.98 | ||
| """ | ||
|
|
||
| __slots__ = ("_address", "_point", "_tuple", "_raw") | ||
|
|
||
| def __init__(self, address="", point=None, raw=None): | ||
| self._address = address | ||
| if point is None: | ||
| self._point = (None, None, None) | ||
| elif isinstance(point, Point): | ||
| self._point = point | ||
| elif isinstance(point, string_compare): | ||
| self._point = Point(point) | ||
| elif isinstance(point, (tuple, list)): | ||
| self._point = Point(point) | ||
| else: | ||
| raise TypeError( | ||
| "point an unsupported type: %r; use %r or Point", | ||
| type(point), type(string_compare) | ||
| ) | ||
| self._tuple = (self._address, (self._point[0], self._point[1])) | ||
| self._raw = raw | ||
|
|
||
| @property | ||
| def address(self): | ||
| """ | ||
| Location as a formatted string returned by the geocoder or constructed | ||
| by geopy, depending on the service. | ||
| :rtype: unicode | ||
| """ | ||
| return self._address | ||
|
|
||
| @property | ||
| def latitude(self): | ||
| """ | ||
| Location's latitude. | ||
| :rtype: float or None | ||
| """ | ||
| return self._point[0] | ||
|
|
||
| @property | ||
| def longitude(self): | ||
| """ | ||
| Location's longitude. | ||
| :rtype: float or None | ||
| """ | ||
| return self._point[1] | ||
|
|
||
| @property | ||
| def altitude(self): | ||
| """ | ||
| Location's altitude. | ||
| :rtype: float or None | ||
| """ | ||
| return self._point[2] | ||
|
|
||
| @property | ||
| def point(self): | ||
| """ | ||
| :class:`geopy.point.Point` instance representing the location's | ||
| latitude, longitude, and altitude. | ||
| :rtype: :class:`geopy.point.Point` or None | ||
| """ | ||
| return self._point if self._point != (None, None, None) else None | ||
|
|
||
| @property | ||
| def raw(self): | ||
| """ | ||
| Location's raw, unparsed geocoder response. For details on this, | ||
| consult the service's documentation. | ||
| :rtype: dict or None | ||
| """ | ||
| return self._raw | ||
|
|
||
| def __getitem__(self, index): | ||
| """ | ||
| Backwards compatibility with geopy<0.98 tuples. | ||
| """ | ||
| return self._tuple[index] | ||
|
|
||
| def __unicode__(self): | ||
| return self._address | ||
|
|
||
| __str__ = __unicode__ | ||
|
|
||
| def __repr__(self): | ||
| if py3k: | ||
| return "Location(%s, (%s, %s, %s))" % ( | ||
| self._address, self.latitude, self.longitude, self.altitude | ||
| ) | ||
| else: | ||
| # Python 2 should not return unicode in __repr__: | ||
| # http://bugs.python.org/issue5876 | ||
| return "Location((%s, %s, %s))" % ( | ||
| self.latitude, self.longitude, self.altitude | ||
| ) | ||
|
|
||
|
|
||
| def __iter__(self): | ||
| return iter(self._tuple) | ||
|
|
||
| def __eq__(self, other): | ||
| return ( | ||
| isinstance(other, Location) and | ||
| self._address == other._address and # pylint: disable=W0212 | ||
| self._point == other._point and # pylint: disable=W0212 | ||
| self.raw == other.raw | ||
| ) | ||
|
|
||
| def __ne__(self, other): | ||
| return not self.__eq__(other) | ||
|
|
||
| def __len__(self): # pragma: no cover | ||
| return len(self._tuple) | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,361 @@ | ||
| # encoding: utf-8 | ||
| """ | ||
| :class:`.Point` data structure. | ||
| """ | ||
|
|
||
| import re | ||
| from itertools import islice | ||
| from geopy import util, units | ||
| from geopy.format import ( | ||
| DEGREE, | ||
| PRIME, | ||
| DOUBLE_PRIME, | ||
| format_degrees, | ||
| format_distance, | ||
| ) | ||
| from geopy.compat import string_compare | ||
|
|
||
|
|
||
| POINT_PATTERN = re.compile(r""" | ||
| .*? | ||
| (?P<latitude> | ||
| (?P<latitude_direction_front>[NS])?[ ]* | ||
| (?P<latitude_degrees>-?%(FLOAT)s)(?:[%(DEGREE)sD\*\u00B0\s][ ]* | ||
| (?:(?P<latitude_arcminutes>%(FLOAT)s)[%(PRIME)s'm][ ]*)? | ||
| (?:(?P<latitude_arcseconds>%(FLOAT)s)[%(DOUBLE_PRIME)s"s][ ]*)? | ||
| )?(?P<latitude_direction_back>[NS])?) | ||
| %(SEP)s | ||
| (?P<longitude> | ||
| (?P<longitude_direction_front>[EW])?[ ]* | ||
| (?P<longitude_degrees>-?%(FLOAT)s)(?:[%(DEGREE)sD\*\u00B0\s][ ]* | ||
| (?:(?P<longitude_arcminutes>%(FLOAT)s)[%(PRIME)s'm][ ]*)? | ||
| (?:(?P<longitude_arcseconds>%(FLOAT)s)[%(DOUBLE_PRIME)s"s][ ]*)? | ||
| )?(?P<longitude_direction_back>[EW])?)(?: | ||
| %(SEP)s | ||
| (?P<altitude> | ||
| (?P<altitude_distance>-?%(FLOAT)s)[ ]* | ||
| (?P<altitude_units>km|m|mi|ft|nm|nmi)))? | ||
| .*?$ | ||
| """ % { | ||
| "FLOAT": r'\d+(?:\.\d+)?', | ||
| "DEGREE": DEGREE, | ||
| "PRIME": PRIME, | ||
| "DOUBLE_PRIME": DOUBLE_PRIME, | ||
| "SEP": r'\s*[,;/\s]\s*', | ||
| }, re.X) | ||
|
|
||
|
|
||
| class Point(object): | ||
| """ | ||
| A geodetic point with latitude, longitude, and altitude. | ||
| Latitude and longitude are floating point values in degrees. | ||
| Altitude is a floating point value in kilometers. The reference level | ||
| is never considered and is thus application dependent, so be consistent! | ||
| The default for all values is 0. | ||
| Points can be created in a number of ways... | ||
| With longitude, latitude, and altitude:: | ||
| >>> p1 = Point(41.5, -81, 0) | ||
| >>> p2 = Point(latitude=41.5, longitude=-81) | ||
| With a sequence of 0 to 3 values (longitude, latitude, altitude):: | ||
| >>> p1 = Point([41.5, -81, 0]) | ||
| >>> p2 = Point((41.5, -81)) | ||
| Copy another `Point` instance:: | ||
| >>> p2 = Point(p1) | ||
| >>> p2 == p1 | ||
| True | ||
| >>> p2 is p1 | ||
| False | ||
| Give a string containing at least latitude and longitude:: | ||
| >>> p1 = Point('41.5,-81.0') | ||
| >>> p2 = Point('41.5 N -81.0 W') | ||
| >>> p3 = Point('-41.5 S, 81.0 E, 2.5km') | ||
| >>> p4 = Point('23 26m 22s N 23 27m 30s E 21.0mi') | ||
| >>> p5 = Point('''3 26' 22" N 23 27' 30" E''') | ||
| Point values can be accessed by name or by index:: | ||
| >>> p = Point(41.5, -81.0, 0) | ||
| >>> p.latitude == p[0] | ||
| True | ||
| >>> p.longitude == p[1] | ||
| True | ||
| >>> p.altitude == p[2] | ||
| True | ||
| When unpacking (or iterating), a (latitude, longitude, altitude) tuple is | ||
| returned:: | ||
| >>> latitude, longitude, altitude = p | ||
| """ | ||
|
|
||
| __slots__ = ("latitude", "longitude", "altitude", "_items") | ||
|
|
||
| POINT_PATTERN = POINT_PATTERN | ||
|
|
||
| def __new__(cls, latitude=None, longitude=None, altitude=None): | ||
| """ | ||
| :param float latitude: Latitude of point. | ||
| :param float longitude: Longitude of point. | ||
| :param float altitude: Altitude of point. | ||
| """ | ||
| single_arg = longitude is None and altitude is None | ||
| if single_arg and not isinstance(latitude, util.NUMBER_TYPES): | ||
| arg = latitude | ||
| if arg is None: # pragma: no cover | ||
| pass | ||
| elif isinstance(arg, Point): | ||
| return cls.from_point(arg) | ||
| elif isinstance(arg, string_compare): | ||
| return cls.from_string(arg) | ||
| else: | ||
| try: | ||
| seq = iter(arg) | ||
| except TypeError: # pragma: no cover | ||
| raise TypeError( | ||
| "Failed to create Point instance from %r." % (arg,) | ||
| ) | ||
| else: | ||
| return cls.from_sequence(seq) | ||
|
|
||
| latitude = float(latitude or 0.0) | ||
| if abs(latitude) > 90: | ||
| latitude = ((latitude + 90) % 180) - 90 | ||
|
|
||
| longitude = float(longitude or 0.0) | ||
| if abs(longitude) > 180: | ||
| longitude = ((longitude + 180) % 360) - 180 | ||
|
|
||
| altitude = float(altitude or 0.0) | ||
|
|
||
| self = super(Point, cls).__new__(cls) | ||
| self.latitude = latitude | ||
| self.longitude = longitude | ||
| self.altitude = altitude | ||
| self._items = [self.latitude, self.longitude, self.altitude] | ||
| return self | ||
|
|
||
| def __getitem__(self, index): | ||
| return self._items[index] | ||
|
|
||
| def __setitem__(self, index, value): | ||
| self._items[index] = value | ||
|
|
||
| def __iter__(self): | ||
| return iter((self.latitude, self.longitude, self.altitude)) | ||
|
|
||
| def __repr__(self): | ||
| return "Point(%r, %r, %r)" % tuple(self._items) | ||
|
|
||
| def format(self, altitude=None, deg_char='', min_char='m', sec_char='s'): | ||
| """ | ||
| Format decimal degrees (DD) to degrees minutes seconds (DMS) | ||
| """ | ||
| latitude = "%s %s" % ( | ||
| format_degrees(abs(self.latitude), symbols={ | ||
| 'deg': deg_char, 'arcmin': min_char, 'arcsec': sec_char | ||
| }), | ||
| self.latitude >= 0 and 'N' or 'S' | ||
| ) | ||
| longitude = "%s %s" % ( | ||
| format_degrees(abs(self.longitude), symbols={ | ||
| 'deg': deg_char, 'arcmin': min_char, 'arcsec': sec_char | ||
| }), | ||
| self.longitude >= 0 and 'E' or 'W' | ||
| ) | ||
| coordinates = [latitude, longitude] | ||
|
|
||
| if altitude is None: | ||
| altitude = bool(self.altitude) | ||
| if altitude: | ||
| if not isinstance(altitude, string_compare): | ||
| altitude = 'km' | ||
| coordinates.append(self.format_altitude(altitude)) | ||
|
|
||
| return ", ".join(coordinates) | ||
|
|
||
| def format_decimal(self, altitude=None): | ||
| """ | ||
| Format decimal degrees with altitude | ||
| """ | ||
| coordinates = [str(self.latitude), str(self.longitude)] | ||
|
|
||
| if altitude is None: | ||
| altitude = bool(self.altitude) | ||
| if altitude is True: | ||
| if not isinstance(altitude, string_compare): | ||
| altitude = 'km' | ||
| coordinates.append(self.format_altitude(altitude)) | ||
|
|
||
| return ", ".join(coordinates) | ||
|
|
||
| def format_altitude(self, unit='km'): | ||
| """ | ||
| Foamt altitude with unit | ||
| """ | ||
| return format_distance(self.altitude, unit=unit) | ||
|
|
||
| def __str__(self): | ||
| return self.format() | ||
|
|
||
| def __unicode__(self): | ||
| return self.format( | ||
| None, DEGREE, PRIME, DOUBLE_PRIME | ||
| ) | ||
|
|
||
| def __eq__(self, other): | ||
| return tuple(self) == tuple(other) | ||
|
|
||
| def __ne__(self, other): | ||
| return tuple(self) != tuple(other) | ||
|
|
||
| @classmethod | ||
| def parse_degrees(cls, degrees, arcminutes, arcseconds, direction=None): | ||
| """ | ||
| Parse degrees minutes seconds including direction (N, S, E, W) | ||
| """ | ||
| degrees = float(degrees) | ||
| negative = degrees < 0 | ||
| arcminutes = float(arcminutes) | ||
| arcseconds = float(arcseconds) | ||
|
|
||
| if arcminutes or arcseconds: | ||
| more = units.degrees(arcminutes=arcminutes, arcseconds=arcseconds) | ||
| if negative: | ||
| degrees -= more | ||
| else: | ||
| degrees += more | ||
|
|
||
| if direction in [None, 'N', 'E']: | ||
| return degrees | ||
| elif direction in ['S', 'W']: | ||
| return -degrees | ||
| else: | ||
| raise ValueError("Invalid direction! Should be one of [NSEW].") | ||
|
|
||
| @classmethod | ||
| def parse_altitude(cls, distance, unit): | ||
| """ | ||
| Parse altitude managing units conversion | ||
| """ | ||
| if distance is not None: | ||
| distance = float(distance) | ||
| CONVERTERS = { | ||
| 'km': lambda d: d, | ||
| 'm': lambda d: units.kilometers(meters=d), | ||
| 'mi': lambda d: units.kilometers(miles=d), | ||
| 'ft': lambda d: units.kilometers(feet=d), | ||
| 'nm': lambda d: units.kilometers(nautical=d), | ||
| 'nmi': lambda d: units.kilometers(nautical=d) | ||
| } | ||
| try: | ||
| return CONVERTERS[unit](distance) | ||
| except KeyError: # pragma: no cover | ||
| raise NotImplementedError( | ||
| 'Bad distance unit specified, valid are: %r' % | ||
| CONVERTERS.keys() | ||
| ) | ||
| else: | ||
| return distance | ||
|
|
||
| @classmethod | ||
| def from_string(cls, string): | ||
| """ | ||
| Create and return a ``Point`` instance from a string containing | ||
| latitude and longitude, and optionally, altitude. | ||
| Latitude and longitude must be in degrees and may be in decimal form | ||
| or indicate arcminutes and arcseconds (labeled with Unicode prime and | ||
| double prime, ASCII quote and double quote or 'm' and 's'). The degree | ||
| symbol is optional and may be included after the decimal places (in | ||
| decimal form) and before the arcminutes and arcseconds otherwise. | ||
| Coordinates given from south and west (indicated by S and W suffixes) | ||
| will be converted to north and east by switching their signs. If no | ||
| (or partial) cardinal directions are given, north and east are the | ||
| assumed directions. Latitude and longitude must be separated by at | ||
| least whitespace, a comma, or a semicolon (each with optional | ||
| surrounding whitespace). | ||
| Altitude, if supplied, must be a decimal number with given units. | ||
| The following unit abbrevations (case-insensitive) are supported: | ||
| - ``km`` (kilometers) | ||
| - ``m`` (meters) | ||
| - ``mi`` (miles) | ||
| - ``ft`` (feet) | ||
| - ``nm``, ``nmi`` (nautical miles) | ||
| Some example strings the will work include: | ||
| - 41.5;-81.0 | ||
| - 41.5,-81.0 | ||
| - 41.5 -81.0 | ||
| - 41.5 N -81.0 W | ||
| - -41.5 S;81.0 E | ||
| - 23 26m 22s N 23 27m 30s E | ||
| - 23 26' 22" N 23 27' 30" E | ||
| - UT: N 39°20' 0'' / W 74°35' 0'' | ||
| """ | ||
| match = re.match(cls.POINT_PATTERN, re.sub(r"''", r'"', string)) | ||
| if match: | ||
| latitude_direction = None | ||
| if match.group("latitude_direction_front"): | ||
| latitude_direction = match.group("latitude_direction_front") | ||
| elif match.group("latitude_direction_back"): | ||
| latitude_direction = match.group("latitude_direction_back") | ||
|
|
||
| longitude_direction = None | ||
| if match.group("longitude_direction_front"): | ||
| longitude_direction = match.group("longitude_direction_front") | ||
| elif match.group("longitude_direction_back"): | ||
| longitude_direction = match.group("longitude_direction_back") | ||
| latitude = cls.parse_degrees( | ||
| match.group('latitude_degrees') or 0.0, | ||
| match.group('latitude_arcminutes') or 0.0, | ||
| match.group('latitude_arcseconds') or 0.0, | ||
| latitude_direction | ||
| ) | ||
| longitude = cls.parse_degrees( | ||
| match.group('longitude_degrees') or 0.0, | ||
| match.group('longitude_arcminutes') or 0.0, | ||
| match.group('longitude_arcseconds') or 0.0, | ||
| longitude_direction | ||
| ) | ||
| altitude = cls.parse_altitude( | ||
| match.group('altitude_distance'), | ||
| match.group('altitude_units') | ||
| ) | ||
| return cls(latitude, longitude, altitude) | ||
| else: | ||
| raise ValueError( | ||
| "Failed to create Point instance from string: unknown format." | ||
| ) | ||
|
|
||
| @classmethod | ||
| def from_sequence(cls, seq): | ||
| """ | ||
| Create and return a new ``Point`` instance from any iterable with 0 to | ||
| 3 elements. The elements, if present, must be latitude, longitude, | ||
| and altitude, respectively. | ||
| """ | ||
| args = tuple(islice(seq, 4)) | ||
| return cls(*args) | ||
|
|
||
| @classmethod | ||
| def from_point(cls, point): | ||
| """ | ||
| Create and return a new ``Point`` instance from another ``Point`` | ||
| instance. | ||
| """ | ||
| return cls(point.latitude, point.longitude, point.altitude) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| """ | ||
| Convert units. | ||
| """ | ||
|
|
||
| import math | ||
|
|
||
|
|
||
| # Angles | ||
|
|
||
| def degrees(radians=0, arcminutes=0, arcseconds=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| deg = 0. | ||
| if radians: | ||
| deg = math.degrees(radians) | ||
| if arcminutes: | ||
| deg += arcminutes / arcmin(degrees=1.) | ||
| if arcseconds: | ||
| deg += arcseconds / arcsec(degrees=1.) | ||
| return deg | ||
|
|
||
| def radians(degrees=0, arcminutes=0, arcseconds=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| if arcminutes: | ||
| degrees += arcminutes / arcmin(degrees=1.) | ||
| if arcseconds: | ||
| degrees += arcseconds / arcsec(degrees=1.) | ||
| return math.radians(degrees) | ||
|
|
||
| def arcminutes(degrees=0, radians=0, arcseconds=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| if radians: | ||
| degrees += math.degrees(radians) | ||
| if arcseconds: | ||
| degrees += arcseconds / arcsec(degrees=1.) | ||
| return degrees * 60. | ||
|
|
||
| def arcseconds(degrees=0, radians=0, arcminutes=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| if radians: | ||
| degrees += math.degrees(radians) | ||
| if arcminutes: | ||
| degrees += arcminutes / arcmin(degrees=1.) | ||
| return degrees * 3600. | ||
|
|
||
|
|
||
| # Lengths | ||
|
|
||
| def kilometers(meters=0, miles=0, feet=0, nautical=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| ret = 0. | ||
| if meters: | ||
| ret += meters / 1000. | ||
| if feet: | ||
| miles += feet / ft(1.) | ||
| if nautical: | ||
| ret += nautical / nm(1.) | ||
| ret += miles * 1.609344 | ||
| return ret | ||
|
|
||
| def meters(kilometers=0, miles=0, feet=0, nautical=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| return (kilometers + km(nautical=nautical, miles=miles, feet=feet)) * 1000 | ||
|
|
||
| def miles(kilometers=0, meters=0, feet=0, nautical=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| ret = 0. | ||
| if nautical: | ||
| kilometers += nautical / nm(1.) | ||
| if feet: | ||
| ret += feet / ft(1.) | ||
| if meters: | ||
| kilometers += meters / 1000. | ||
| ret += kilometers * 0.621371192 | ||
| return ret | ||
|
|
||
| def feet(kilometers=0, meters=0, miles=0, nautical=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| ret = 0. | ||
| if nautical: | ||
| kilometers += nautical / nm(1.) | ||
| if meters: | ||
| kilometers += meters / 1000. | ||
| if kilometers: | ||
| miles += mi(kilometers=kilometers) | ||
| ret += miles * 5280 | ||
| return ret | ||
|
|
||
| def nautical(kilometers=0, meters=0, miles=0, feet=0): # pylint: disable=W0621 | ||
| """ | ||
| TODO docs. | ||
| """ | ||
| ret = 0. | ||
| if feet: | ||
| miles += feet / ft(1.) | ||
| if miles: | ||
| kilometers += km(miles=miles) | ||
| if meters: | ||
| kilometers += meters / 1000. | ||
| ret += kilometers / 1.852 | ||
| return ret | ||
|
|
||
|
|
||
| # Compatible names | ||
|
|
||
| rad = radians # pylint: disable=C0103 | ||
| arcmin = arcminutes # pylint: disable=C0103 | ||
| arcsec = arcseconds # pylint: disable=C0103 | ||
| km = kilometers # pylint: disable=C0103 | ||
| m = meters # pylint: disable=C0103 | ||
| mi = miles # pylint: disable=C0103 | ||
| ft = feet # pylint: disable=C0103 | ||
| nm = nautical # pylint: disable=C0103 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| """ | ||
| Utils. | ||
| """ | ||
|
|
||
| import logging | ||
| from geopy.compat import py3k | ||
|
|
||
| if not py3k: # pragma: no cover | ||
| NUMBER_TYPES = (int, long, float) | ||
| else: # pragma: no cover | ||
| NUMBER_TYPES = (int, float) # long -> int in Py3k | ||
| try: | ||
| from decimal import Decimal | ||
| NUMBER_TYPES = NUMBER_TYPES + (Decimal, ) | ||
| except ImportError: # pragma: no cover | ||
| pass | ||
|
|
||
|
|
||
| __version__ = "1.11.0" | ||
|
|
||
|
|
||
| class NullHandler(logging.Handler): | ||
| """ | ||
| No output. | ||
| """ | ||
|
|
||
| def emit(self, record): | ||
| pass | ||
|
|
||
| logger = logging.getLogger('geopy') # pylint: disable=C0103 | ||
| logger.setLevel(logging.CRITICAL) | ||
|
|
||
|
|
||
| def pairwise(seq): | ||
| """ | ||
| Pair an iterable, e.g., (1, 2, 3, 4) -> ((1, 2), (3, 4)) | ||
| """ | ||
| for i in range(0, len(seq) - 1): | ||
| yield (seq[i], seq[i + 1]) | ||
|
|
||
|
|
||
| if not py3k: | ||
| def join_filter(sep, seq, pred=bool): | ||
| """ | ||
| Join with a filter. | ||
| """ | ||
| return sep.join([unicode(i) for i in seq if pred(i)]) | ||
| else: | ||
| def join_filter(sep, seq, pred=bool): | ||
| """ | ||
| Join with a filter. | ||
| """ | ||
| return sep.join([str(i) for i in seq if pred(i)]) | ||
|
|
||
|
|
||
| if not py3k: | ||
| def decode_page(page): | ||
| """ | ||
| Return unicode string of geocoder results. | ||
| Nearly all services use JSON, so assume UTF8 encoding unless the | ||
| response specifies otherwise. | ||
| """ | ||
| if hasattr(page, 'read'): # urllib | ||
| # note getparam in py2 | ||
| encoding = page.headers.getparam("charset") or "utf-8" | ||
| return unicode(page.read(), encoding=encoding) | ||
| else: # requests? | ||
| encoding = page.headers.get("charset", "utf-8") | ||
| return unicode(page.content, encoding=encoding) | ||
| else: | ||
| def decode_page(page): | ||
| """ | ||
| Return unicode string of geocoder results. | ||
| Nearly all services use JSON, so assume UTF8 encoding unless the | ||
| response specifies otherwise. | ||
| """ | ||
| if hasattr(page, 'read'): # urllib | ||
| # note get_param in py3 | ||
| encoding = page.headers.get_param("charset") or "utf-8" | ||
| return str(page.read(), encoding=encoding) | ||
| else: # requests? | ||
| encoding = page.headers.get("charset") or "utf-8" | ||
| return str(page.content, encoding=encoding) | ||
|
|
||
|
|
||
| def get_version(): | ||
| from geopy.version import GEOPY_VERSION | ||
| return str(GEOPY_VERSION) | ||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| from geopy.distance import great_circle as distance | ||
| from geopy.point import Point | ||
| import math | ||
| import polyline | ||
|
|
||
| def dist_to_line(a, b, c): | ||
| """Return distance from point a to line (b, c).""" | ||
| dab = distance(a, b).meters or 0.00000001 | ||
| dac = distance(a, c).meters or 0.00000001 | ||
| dbc = distance(b, c).meters or 0.00000001 | ||
| semiperim = (dab+dac+dbc)/2 | ||
| try: | ||
| A = math.sqrt(semiperim*(semiperim-dab)*(semiperim-dac)*(semiperim-dbc)) | ||
| except ValueError as e: | ||
| # Most likely one of the legs is 0 and that is screwing up the math | ||
| # Points are therefore probably colinear, so return | ||
| if dab>dbc: | ||
| return dac | ||
| elif dac>dbc: | ||
| return dab | ||
| else: | ||
| return min(dab, dac) | ||
| d = 2*A/dab | ||
| if dac > math.sqrt(dab**2 + dbc**2): | ||
| return dbc | ||
| elif dbc > math.sqrt(dab**2 + dac**2): | ||
| return dac | ||
| else: | ||
| return d | ||
|
|
||
| def distance_to_polyline(point, line): | ||
| if isinstance(point, list) or isinstance(point, tuple): | ||
| point = Point(*point) | ||
| geline = [Point(i[0], i[1]) for i in line] | ||
| # return [dist_to_line(line[i],line[i+1], point) for i in range(len(line)-1)] | ||
| return min([dist_to_line(line[i],line[i+1], point) for i in range(len(line)-1)]) | ||
|
|
||
| if __name__=="__main__": |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| from .codec import PolylineCodec | ||
|
|
||
| __version__ = '1.3.1' | ||
|
|
||
|
|
||
| def decode(expression, precision=5): | ||
| """ | ||
| Decode a polyline string into a set of coordinates. | ||
| :param expression: Polyline string, e.g. 'u{~vFvyys@fS]'. | ||
| :param precision: Precision of the encoded coordinates. Google Maps uses 5, OpenStreetMap uses 6. | ||
| The default value is 5. | ||
| :return: List of coordinate tuples | ||
| """ | ||
| return PolylineCodec().decode(expression, precision) | ||
|
|
||
|
|
||
| def encode(coordinates, precision=5): | ||
| """ | ||
| Encode a set of coordinates in a polyline string. | ||
| :param coordinates: List of coordinate tuples, e.g. [(0, 0), (1, 0)]. | ||
| :param precision: Precision of the coordinates to encode. Google Maps uses 5, OpenStreetMap uses 6. | ||
| The default value is 5. | ||
| :return: The encoded polyline string. | ||
| """ | ||
| return PolylineCodec().encode(coordinates, precision) | ||
|
|
||
|
|
||
| __all__ = ['decode', 'encode'] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| import itertools | ||
| import six | ||
| import math | ||
|
|
||
|
|
||
| class PolylineCodec(object): | ||
| def _pcitr(self, iterable): | ||
| return six.moves.zip(iterable, itertools.islice(iterable, 1, None)) | ||
|
|
||
| def _py2_round(self, x): | ||
| # The polyline algorithm uses Python 2's way of rounding | ||
| return int(math.copysign(math.floor(math.fabs(x) + 0.5), x)) | ||
|
|
||
| def _write(self, output, curr_value, prev_value, factor): | ||
| curr_value = self._py2_round(curr_value * factor) | ||
| prev_value = self._py2_round(prev_value * factor) | ||
| coord = curr_value - prev_value | ||
| coord <<= 1 | ||
| coord = coord if coord >= 0 else ~coord | ||
|
|
||
| while coord >= 0x20: | ||
| output.write(six.unichr((0x20 | (coord & 0x1f)) + 63)) | ||
| coord >>= 5 | ||
|
|
||
| output.write(six.unichr(coord + 63)) | ||
|
|
||
| def _trans(self, value, index): | ||
| byte, result, shift = None, 0, 0 | ||
|
|
||
| while byte is None or byte >= 0x20: | ||
| byte = ord(value[index]) - 63 | ||
| index += 1 | ||
| result |= (byte & 0x1f) << shift | ||
| shift += 5 | ||
| comp = result & 1 | ||
|
|
||
| return ~(result >> 1) if comp else (result >> 1), index | ||
|
|
||
| def decode(self, expression, precision=5): | ||
| coordinates, index, lat, lng, length, factor = [], 0, 0, 0, len(expression), float(10 ** precision) | ||
|
|
||
| while index < length: | ||
| lat_change, index = self._trans(expression, index) | ||
| lng_change, index = self._trans(expression, index) | ||
| lat += lat_change | ||
| lng += lng_change | ||
| coordinates.append((lat / factor, lng / factor)) | ||
|
|
||
| return coordinates | ||
|
|
||
| def encode(self, coordinates, precision=5): | ||
| output, factor = six.StringIO(), int(10 ** precision) | ||
|
|
||
| self._write(output, coordinates[0][0], 0, factor) | ||
| self._write(output, coordinates[0][1], 0, factor) | ||
|
|
||
| for prev, curr in self._pcitr(coordinates): | ||
| self._write(output, curr[0], prev[0], factor) | ||
| self._write(output, curr[1], prev[1], factor) | ||
|
|
||
| return output.getvalue() |