In [None]:
# Download geckodriver from https://github.com/mozilla/geckodriver/releases/tag/v0.29.1
# Download debian countries
# download ISO countries
# import other mapping

In [None]:
import requests
import json
import pprint
import selenium
import time
import bs4
import os
from selenium import webdriver
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Download geckodriver from https://github.com/mozilla/geckodriver/releases/tag/v0.29.1 and unpack it
GECKODRIVER_PATH = "./geckodriver"
ISO_URL = "https://www.iso.org/obp/ui/#search/code/"
DEBIAN_URL = "https://salsa.debian.org/iso-codes-team/iso-codes/-/raw/main/data/iso_3166-1.json"

STIX_PATH = "../data/geography.json"

In [None]:
## Download country list form iso.org

def download_iso_table(url: str) -> dict:
    driver = webdriver.Firefox(executable_path=GECKODRIVER_PATH)
    driver.implicitly_wait(10) # seconds
    driver.get(url)
    ddelement = Select(driver.find_element_by_xpath("//select[@class='v-select-select']"))
    ddelement.select_by_value('8')
    # TODO replace sleep with selenium wait for element
    time.sleep(4)
    bs4_code = bs4.BeautifulSoup(driver.page_source, 'lxml')
    driver.quit()
    
    return bs4_code


# Credits to https://stackoverflow.com/a/58274853
def tableDataText(table) -> list: 
    """Parses a html segment started with tag <table> followed 
    by multiple <tr> (table rows) and inner <td> (table data) tags. 
    It returns a list of rows with inner columns. 
    Accepts only one <th> (table header/data) in the first row.
    """
    def rowgetDataText(tr, coltag='td'): # td (data) or th (header)       
        return [td.get_text(strip=True) for td in tr.find_all(coltag)]  
    rows = []
    trs = table.find_all('tr')
    headerow = rowgetDataText(trs[0], 'th')
    if headerow: # if there is a header row include first
        rows.append(headerow)
        trs = trs[1:]
    for tr in trs: # for every table row
        rows.append(rowgetDataText(tr, 'td') ) # data row       
    return rows

def convert_table_to_dict(table_content: list[list[str]]) -> dict:
    # first row is header
    country_dict = {}
    for entry in table_content[1:]:
        country_dict[entry[3]] = {
            # replace because of instances like "Western Sahara*" on iso.org
            'names': set([entry[0].replace('*', '')]),
            'alpha_2': entry[2],
            'alpha_3': entry[3]
        }
    
    return country_dict

def get_iso_countries() -> dict:
    bs4_code = download_iso_table(ISO_URL)
    
    table = bs4_code.find("div", {"class": "v-grid-tablewrapper"}).find('table')
    table_content = tableDataText(table)
    
    country_dict = convert_table_to_dict(table_content)
    return country_dict

In [None]:
def download(url: str) -> dict:
     return json.loads(requests.get(url).content)
    
def convert_dict_to_dict(json_content: dict) -> dict:
    country_list = json_content['3166-1']
    country_dict = {}
    for entry in country_list:
        country_dict[entry['alpha_3']] = {
            'names': set([entry['name'], entry.get('official_name', entry['name'])]),
            'alpha_2': entry['alpha_2'],
            'alpha_3': entry['alpha_3']
        }
        
    return country_dict

def get_debian_countries() -> dict:
    content = download(DEBIAN_URL)
    country_table = convert_dict_to_dict(content)
    return country_table

In [None]:
# Other relevant names which are not present in either of the other datasets

custom_country_dict = {
    'BES': {'names': set(['Bonaire'])},
    'CIV': {'names': set(['Ivory Coast'])},
    'FLK': {'names': set(['Falkland Islands'])},
    'FSM': {'names': set(['Micronesia'])},
    'IRN': {'names': set(['Iran'])},
    'KOR': {'names': set(['South Korea'])},
    'LAO': {'names': set(['Laos'])},
    'MDA': {'names': set(['Moldova'])},
    'PRK': {'names': set(['North Kora'])},
    'PSE': {'names': set(['Palestine'])},
    'RUS': {'names': set(['Russia'])},
    'SXM': {'names': set(['Sint Maarten'])},
    'MAF': {'names': set(['Saint Martin'])},
    'TWN': {'names': set(['Taiwan'])},
    'TZA': {'names': set(['Tanzania'])},
    'SYR': {'names': set(['Syria'])},
}

In [None]:
def merge_dicts(countries: list[dict]) -> dict:
    main_dict = countries[0].copy()
    for dictionary in countries[1:]:
        for key in dictionary.keys():
            main_dict[key]['names'].update(dictionary[key]['names'])
    
    return main_dict

In [None]:
debian_country_dict = get_debian_countries()
iso_country_dict = get_iso_countries()

merged_dicts = merge_dicts([debian_country_dict, iso_country_dict, custom_country_dict])

In [None]:
def merge_stix_with_dicts(stix_content: dict, merged_dict: dict) -> dict:
    objects = stix_content['objects']
    processed_countries = set()
    
    for entry_object in objects:
        if 'country' in entry_object:        
            country_specs = merged_dict.get(entry_object['country'], None)
            if country_specs is None:
                print(f"A strange country is found. Investigate!! '{entry_object['country']}'")
                continue
                
            names = country_specs['names'] - set([entry_object['name']]) - set(entry_object['x_opencti_aliases'])
            entry_object['x_opencti_aliases'] += names
            processed_countries.add(entry_object['country'])
    
    diff = merged_dict.keys() - processed_countries
    if diff:
        print(f"Unprocessed countries {diff}")
    
    return stix_content

def modify_stix(stix_file: str, merged_dict: dict) -> None:
    with open(stix_file, 'r') as f:
        stix_data = json.load(f)
        content = merge_stix_with_dicts(stix_data, merged_dict)
        
    base_name, ext = os.path.splitext(stix_file)
    new_file_name = f"{base_name}_new{ext}"
    with open(new_file_name, "w") as f:
        json.dump(content,f, indent=4)
        
    print("All done here!")
    return content

In [None]:
new_stix_content = modify_stix(STIX_PATH, merged_dicts)