From 0092942852b9d628ffb1e1910d0163793ba8e1cc Mon Sep 17 00:00:00 2001 From: Laurent Pellegrino Date: Thu, 13 Jun 2024 21:52:25 +0200 Subject: [PATCH] Add new functions to lookup AS data --- CHANGELOG.md | 1 + README.md | 12 ++++++++ ipregistry/core.py | 37 +++++++++++++++++++---- ipregistry/request.py | 57 ++++++++++++++++++++++++++++++++++-- samples/batch-lookup-asns.py | 35 ++++++++++++++++++++++ samples/batch-lookup-ips.py | 10 +++---- samples/lookup-asn.py | 30 +++++++++++++++++++ samples/origin-lookup-asn.py | 30 +++++++++++++++++++ tests/test_client.py | 39 ++++++++++++++++++++++-- 9 files changed, 236 insertions(+), 15 deletions(-) create mode 100644 samples/batch-lookup-asns.py create mode 100644 samples/lookup-asn.py create mode 100644 samples/origin-lookup-asn.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 06d9ff3..c66d2aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Changed - Require Python 3.8+. +- Add new functions for retrieving Autonomous System data: `batch_lookup_asns`, `lookup_asn`, `origin_lookup_asn`. - Add new functions for user-agent header value parsing: `batch_parse_user_agents`, `parse_user_agent`. - API key is passed as header value and no longer as query parameter. - Client library method are now wrapped in a new _ApiResponse_ object that includes a mean to retrieve metadata diff --git a/README.md b/README.md index 8dbf246..bf59273 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,18 @@ credits_consumed = response.credits.consumed credits_remaining = response.credits.remaining ``` +#### Single ASN Lookup + +```python +from ipregistry import IpregistryClient + +client = IpregistryClient("YOUR_API_KEY") +response = client.lookup_asn(42) +print(response.credits.consumed) +print(response.data.prefixes) +print(response.data.relationships) +``` + #### Batch IP Lookup ```python diff --git a/ipregistry/core.py b/ipregistry/core.py index 629e6c4..407598f 100644 --- a/ipregistry/core.py +++ b/ipregistry/core.py @@ -14,7 +14,7 @@ limitations under the License. """ from .cache import IpregistryCache, NoCache -from .json import IpInfo, RequesterIpInfo +from .json import AutonomousSystem, IpInfo from .model import LookupError, ApiResponse, ApiResponseCredits, ApiResponseThrottling from .request import DefaultRequestHandler, IpregistryRequestHandler @@ -30,6 +30,9 @@ def __init__(self, key_or_config, **kwargs): if not isinstance(self._requestHandler, IpregistryRequestHandler): raise ValueError("Given request handler instance is not of type IpregistryRequestHandler") + def batch_lookup_asns(self, ips, **options): + return self.batch_request(ips, self._requestHandler.batch_lookup_asns, **options) + def batch_lookup_ips(self, ips, **options): return self.batch_request(ips, self._requestHandler.batch_lookup_ips, **options) @@ -77,25 +80,40 @@ def batch_request(self, items, request_handler_func, **options): return response - def lookup_ip(self, ip='', **options): + def lookup_asn(self, asn, **options): + return self.__lookup_asn(asn, options) + + def lookup_ip(self, ip, **options): if isinstance(ip, str): return self.__lookup_ip(ip, options) else: raise ValueError("Invalid value for 'ip' parameter: " + ip) + def origin_lookup_asn(self, **options): + return self.__lookup_asn('AS', options) + def origin_lookup_ip(self, **options): return self.__lookup_ip('', options) def origin_parse_user_agent(self, **options): return self._requestHandler.origin_parse_user_agent(options) + def __lookup_asn(self, asn, options): + return self.__lookup( + 'AS' + str(asn) if IpregistryClient.__is_number(asn) else 'AS', + options, self._requestHandler.lookup_asn, + AutonomousSystem) + def __lookup_ip(self, ip, options): - cache_key = self.__build_cache_key(ip, options) + return self.__lookup(ip, options, self._requestHandler.lookup_ip, IpInfo) + + def __lookup(self, key, options, lookup_func, response_type): + cache_key = self.__build_cache_key(key, options) cache_value = self._cache.get(cache_key) if cache_value is None: - response = self._requestHandler.lookup_ip(ip, options) - if isinstance(response.data, IpInfo): + response = lookup_func(key, options) + if isinstance(response.data, response_type): self._cache.put(cache_key, response.data) return response @@ -125,6 +143,15 @@ def __build_cache_key(key, options): def __is_api_error(data): return 'code' in data + @staticmethod + def __is_number(value): + try: + # Try converting the value to a float + float(value) + return True + except ValueError: + return False + class IpregistryConfig: def __init__(self, key, base_url="https://api.ipregistry.co", timeout=15): diff --git a/ipregistry/request.py b/ipregistry/request.py index ddfba65..a60cb9d 100644 --- a/ipregistry/request.py +++ b/ipregistry/request.py @@ -23,14 +23,19 @@ import requests from .__init__ import __version__ -from .model import (ApiError, ApiResponse, ApiResponseCredits, ApiResponseThrottling, ClientError, IpInfo, - LookupError, RequesterIpInfo, RequesterUserAgent, UserAgent) +from .model import (ApiError, ApiResponse, ApiResponseCredits, ApiResponseThrottling, AutonomousSystem, + ClientError, IpInfo, LookupError, RequesterAutonomousSystem, RequesterIpInfo, + RequesterUserAgent, UserAgent) class IpregistryRequestHandler(ABC): def __init__(self, config): self._config = config + @abstractmethod + def batch_lookup_asns(self, ips, options): + pass + @abstractmethod def batch_lookup_ips(self, ips, options): pass @@ -39,6 +44,10 @@ def batch_lookup_ips(self, ips, options): def batch_parse_user_agents(self, user_agents, options): pass + @abstractmethod + def lookup_asn(self, asn, options): + pass + @abstractmethod def lookup_ip(self, ip, options): pass @@ -65,6 +74,29 @@ def _build_base_url(self, resource, options): class DefaultRequestHandler(IpregistryRequestHandler): + def batch_lookup_asns(self, asns, options): + response = None + try: + response = requests.post( + self._build_base_url('', options), + data=json.dumps(list(map(lambda asn: "AS" + str(asn), asns))), + headers=self.__headers(), + timeout=self._config.timeout + ) + response.raise_for_status() + results = response.json().get('results', []) + + parsed_results = [ + LookupError(data) if 'code' in data else AutonomousSystem(**data) + for data in results + ] + + return self.build_api_response(response, parsed_results) + except requests.HTTPError: + self.__create_api_error(response) + except Exception as e: + raise ClientError(e) + def batch_lookup_ips(self, ips, options): response = None try: @@ -111,6 +143,27 @@ def batch_parse_user_agents(self, user_agents, options): except Exception as e: raise ClientError(e) + def lookup_asn(self, asn, options): + response = None + try: + response = requests.get( + self._build_base_url(asn, options), + headers=self.__headers(), + timeout=self._config.timeout + ) + response.raise_for_status() + json_response = response.json() + + return self.build_api_response( + response, + RequesterAutonomousSystem(**json_response) if asn == 'AS' + else AutonomousSystem(**json_response) + ) + except requests.HTTPError: + self.__create_api_error(response) + except Exception as err: + raise ClientError(err) + def lookup_ip(self, ip, options): response = None try: diff --git a/samples/batch-lookup-asns.py b/samples/batch-lookup-asns.py new file mode 100644 index 0000000..1f6de20 --- /dev/null +++ b/samples/batch-lookup-asns.py @@ -0,0 +1,35 @@ +""" + Copyright 2019 Ipregistry (https://ipregistry.co). + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +from ipregistry import ApiError, AutonomousSystem, ClientError, IpregistryClient + +try: + api_key = "tryout" + client = IpregistryClient(api_key) + response = client.batch_lookup_asns([42, 4441, 51933]) + data_list = response.data + + for entry in data_list: + if isinstance(entry, AutonomousSystem): + print("AutonomousSystem", entry) + else: + print("LookupError", entry) +except ApiError as e: + print("API error", e) +except ClientError as e: + print("Client error", e) +except Exception as e: + print("Unexpected error", e) diff --git a/samples/batch-lookup-ips.py b/samples/batch-lookup-ips.py index 213ba9f..916c6c0 100644 --- a/samples/batch-lookup-ips.py +++ b/samples/batch-lookup-ips.py @@ -20,13 +20,13 @@ api_key = "tryout" client = IpregistryClient(api_key) response = client.batch_lookup_ips(["73.2.2.2", "8.8.8.8", "2001:67c:2e8:22::c100:68b"]) - ip_info_list = response.data + data_list = response.data - for lookup_result in ip_info_list: - if isinstance(lookup_result, IpInfo): - print("IpInfo", lookup_result) + for entry in data_list: + if isinstance(entry, IpInfo): + print("IpInfo", entry) else: - print("LookupError", lookup_result) + print("LookupError", entry) except ApiError as e: print("API error", e) except ClientError as e: diff --git a/samples/lookup-asn.py b/samples/lookup-asn.py new file mode 100644 index 0000000..43cddae --- /dev/null +++ b/samples/lookup-asn.py @@ -0,0 +1,30 @@ +""" + Copyright 2019 Ipregistry (https://ipregistry.co). + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +from ipregistry import ApiError, ClientError, IpregistryClient + +try: + api_key = "tryout" + client = IpregistryClient(api_key) + response = client.lookup_asn(42) + print(response.credits.consumed) + print(response.data.relationships) +except ApiError as e: + print("API error", e) +except ClientError as e: + print("Client error", e) +except Exception as e: + print("Unexpected error", e) diff --git a/samples/origin-lookup-asn.py b/samples/origin-lookup-asn.py new file mode 100644 index 0000000..93a330b --- /dev/null +++ b/samples/origin-lookup-asn.py @@ -0,0 +1,30 @@ +""" + Copyright 2019 Ipregistry (https://ipregistry.co). + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +from ipregistry import ApiError, ClientError, IpregistryClient + +try: + api_key = "tryout" + client = IpregistryClient(api_key) + response = client.origin_lookup_asn() + as_data = response.data + print(as_data) +except ApiError as e: + print("API error", e) +except ClientError as e: + print("Client error", e) +except Exception as e: + print("Unexpected error", e) diff --git a/tests/test_client.py b/tests/test_client.py index 7f3aaf1..e3017f0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -17,13 +17,27 @@ import os import unittest -from ipregistry import ApiError, IpInfo, LookupError, ClientError, UserAgent +from ipregistry import ApiError, AutonomousSystem, IpInfo, LookupError, ClientError, UserAgent from ipregistry.cache import InMemoryCache, NoCache from ipregistry.core import IpregistryClient, IpregistryConfig class TestIpregistryClient(unittest.TestCase): + def test_batch_lookup_asns(self): + """ + Test batch asns lookup with valid and invalid inputs + """ + client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY')) + response = client.batch_lookup_asns([33, 'invalid', -1]) + print(response) + self.assertEqual(3, len(response.data)) + self.assertEqual(True, isinstance(response.data[0], AutonomousSystem)) + self.assertEqual(True, isinstance(response.data[1], LookupError)) + self.assertEqual('INVALID_ASN', response.data[1].code) + self.assertEqual(True, isinstance(response.data[2], LookupError)) + self.assertEqual('INVALID_ASN', response.data[2].code) + def test_batch_lookup_ips(self): """ Test batch ips lookup with valid and invalid inputs @@ -83,6 +97,16 @@ def test_client_cache_inmemory_batch_ips_lookup(self): batch_ips_response2 = client.batch_lookup_ips(['1.1.1.1', '1.1.1.3']) self.assertEqual(0, batch_ips_response2.credits.consumed) + def test_lookup_asn(self): + """ + Test Autonomous System data lookup by ASN + """ + client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY')) + response = client.lookup_asn(400923) + self.assertEqual(400923, response.data.asn) + self.assertEqual(1, response.credits.consumed) + self.assertIsNotNone(response.data.relationships) + def test_lookup_ip(self): """ Test that a simple IP lookup returns data @@ -100,7 +124,6 @@ def test_lookup_ip_invalid_input(self): client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY')) with self.assertRaises(ApiError) as context: response = client.lookup_ip('invalid') - print("test", context.exception) self.assertEqual('INVALID_IP_ADDRESS', context.exception.code) def test_lookup_ip_cache(self): @@ -122,12 +145,22 @@ def test_lookup_timeout(self): with self.assertRaises(ClientError): client.lookup_ip('1.1.1.1') + def test_origin_asn(self): + """ + Test origin Autonomous System data lookup + """ + client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY')) + response = client.origin_lookup_asn() + self.assertIsNotNone(response.data.asn) + self.assertEqual(1, response.credits.consumed) + self.assertIsNotNone(response.data.relationships) + def test_origin_lookup_ip(self): """ Test that a simple origin IP lookup returns data """ client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY')) - response = client.lookup_ip() + response = client.origin_lookup_ip() self.assertIsNotNone(response.data.ip) self.assertIsNotNone(response.data.user_agent)