## Imports and globals

In [None]:
from typing import Any, Dict, List, Tuple

from msticpy.sectools.tilookup import TILookup
from msticpy.sectools.tiproviders.ti_provider_base import LookupResult, TISeverity
from msticpy.sectools.tiproviders.http_base import HttpProvider, IoCLookupParams
from msticpy.common.utility import export
from msticpy._version import VERSION

import time

__version__ = VERSION
__author__ = "Martijn Veken"

## Shodan TI Provider

In [None]:
"""
Shodan Provider.

Input can be a single IoC observable or a pandas DataFrame containing
multiple observables. Processing may require a an API key and
processing performance may be limited to a specific number of
requests per minute for the account type that you have.

"""

@export
class Shodan(HttpProvider):
    """Shodan Lookup."""

    _BASE_URL = "https://api.shodan.io/"

    _IOC_QUERIES = {
        "ipv4": IoCLookupParams(path="shodan/host/{observable}?key={API_KEY}"),
    }

    _REQUIRED_PARAMS = ["API_KEY"]

    def __init__(self, **kwargs):
        """Initialize a new instance of the class."""
        super().__init__(**kwargs)

        # Set wait time between API calls to prevent time-outs. 
        # Default wait is 0.5 which should always work for Shodan
        self._sleep_time = kwargs.pop("SleepTime") if "SleepTime" in kwargs else 0.5

    def parse_results(self, response: LookupResult) -> Tuple[bool, TISeverity, Any]:
        """
        Return the details of the response.

        Parameters
        ----------
        response : LookupResult
            The returned data response

        Returns
        -------
        Tuple[bool, TISeverity, Any]
            bool = positive or negative hit
            TISeverity = enumeration of severity
            Object with match details

        """
        if self._failed_response(response) or not isinstance(response.raw_result, dict):
            return False, TISeverity.information, "Not found."

        # Get most used information from the results for easy reference. This ends up in the Details field.
        # Other data is still availabe in the RawResult field.
        results_dict = {
            "hostnames": response.raw_result["hostnames"],
            "org": response.raw_result["org"],
            "asn": response.raw_result["asn"],
            "isp": response.raw_result["isp"],
            "country_code": response.raw_result["country_code"],
            "ports": response.raw_result["ports"]
        }

        # Strip sensitive API key from the reference
        response.reference = response.reference.replace(self._request_params["API_KEY"], "[API_KEY]")

        # Shodan data is informational
        severity = TISeverity.information

        # Add a small wait time to prevents API time-outs
        time.sleep(self._sleep_time)

        return True, severity, results_dict

## Functions

In [None]:
def load_ti_providers(provider_configs):
    """"
    Return the default configured TI providers and the extra specified ones.

    Parameters
    ----------
    provider_configs: Dict
        Dictionary containing configuration options for the additional TI providers

    Returns
    -------
    TILookup
        Reference to the TI providers
    """
    ti_lookup = TILookup()

    if "Shodan" in provider_configs:
        shodan_config = provider_configs["Shodan"]
        ti_lookup.add_provider(provider=Shodan(AuthKey=shodan_config["shodan_api_key"], SleepTime=shodan_config["shodan_sleep_time"]), name="Shodan", primary=True)

    return ti_lookup