# Shodan reference data download

This notebook contains the implementation of downloading reference data from shodan.

In [None]:
from shodan import Shodan, APIError
import json
from tqdm import notebook
import pandas as pd

In [1]:
source_shodan_profiles_path = 'data/source/shodan_profiles.json'
output_extracted_oses_path = 'data/source/shodan_os_extracted.csv'
shodan_api_key = 'YOUR_API_KEY'

In [None]:
with open(source_shodan_profiles_path, 'r') as f:
    profiles = json.load(f)

In [None]:
api = Shodan(shodan_api_key)

In [None]:
for ip, profile in notebook.tqdm(profiles.items()):
    if profile == {}:
        try:
            info = api.host(ip)
            profiles[ip] = info
        except APIError as e:
            profiles[ip] = {"os": None}
            continue

In [None]:
with open(source_shodan_profiles_path, 'w') as f:
    json.dump(profiles, f, indent=2)

In [None]:
filled = [ip for ip, profile in profiles.items() if profile != {}]
len(filled)

In [None]:
unfilled = [ip for ip, profile in profiles.items() if profile == {}]
len(unfilled)

In [None]:
have_info = [ip for ip, profile in profiles.items() if profile != {"os": None}]
len(have_info)

In [None]:
have_os_info = [ip for ip in have_info if profiles[ip]["os"] is not None]
len(have_os_info)

In [None]:
def get_os_entries(ip_profile, path = None, verbose = False):
    path = [] if path is None else path
    entries = []
    for key, item in ip_profile.items():
        path.append(key)
        if key == 'os' and item is not None:
            if verbose:
                print(path, key, item)
            entries.append(item)
        if isinstance(item, list):
            for i, data_item in enumerate(item):
                if isinstance(data_item, dict):
                    path.append(i)
                    entry = get_os_entries(data_item, path)
                    if entry:
                        entries.extend(entry)
                    path.pop()
        if isinstance(item, dict):
            entry = get_os_entries(item, path)
            if entry:
                entries.extend(entry)
    return entries

def find_common_substring(strings: list):
    longest_substring = None
    for string in strings:
        if longest_substring is None:
            longest_substring = string
            continue
        while longest_substring not in string:
            longest_substring = longest_substring[:-1]
    return longest_substring


extracted_os_infos = {}
for ip in have_info:
    entries = get_os_entries(profiles[ip])
    if entries:
        if all(isinstance(item, (str, list)) for item in entries):
            flattened = []
            for item in entries:
                if isinstance(item, list):
                    flattened.extend(item)
                else:
                    flattened.append(item)
            entries = list(set(flattened))
            if len(entries) == 1:
                extracted_os_infos[ip] = entries[0]
            else:
                if find_common_substring(entries):
                    extracted_os_infos[ip] = find_common_substring(entries).strip()
extracted_os_infos

In [None]:
pd.DataFrame(extracted_os_infos, index=['os']).T.to_csv(output_extracted_oses_path)