In [2]:
# Import MAESTRI dataset

# Import correspondence tables

# Match ISIC, WZ and SSIC codes to NACE codes

## ISIC doesn't need NLP

## WZ and SSIC have multiple codes for a given NACE code

### Use a LLM to find the best match given extra info

### 0. Utilities

#### Imports

In [3]:
import pandas as pd
import re
import ollama
import json

In [4]:
import os
import sys

module_path = os.path.abspath(os.path.join('..'))

if module_path not in sys.path:
    sys.path.append(module_path)

from utils.constants import STANDARDS
from utils.types import IndustryStandard, IndustryCode
from utils.functions.files import read_inference, read_correspondence, read_maestri
from utils.functions.maestri import new_col, new_roles

#### Constants

In [5]:
# Non-NACE standards
NON_NACE_STDS = STANDARDS[1:]

inference_dfs = read_inference()

#### Helper functions

##### Obtaining column names

In [6]:
# Obtain column name for similarity score, given ICS and company role
similarity_col = lambda std, role: f"{role} {std.value} code sim. score"

### 1. Reading the MAESTRI dataset

#### Importing the spreadsheet

In [7]:
# Read the MAESTRI dataset as a DataFrame
maestri_dfs = read_maestri()
maestri_dfs[0].head()

Unnamed: 0,Donor NACE code,Donor ISIC code,Donor WZ code,Donor SSIC code
0,1920,1920,19200,19201
1,1920,1920,19200,19201
2,2410,2410,24520,24310
3,2410,2410,24520,24310
4,2351,2394,23510;23650;23610,23940


In [8]:
correspondence_dfs = read_correspondence()
inference_dfs = read_inference()

In [9]:
def get_level(code) -> int:
    df = inference_dfs[code.std]
    
    if code.value in df.index:
        return df.loc[code.value, "Level"]

    return None

def get_children(code):
    df = inference_dfs[code.std]
    
    if code.value == "" or code.value in df.index:
        return [IndustryCode(code.std, value) for value in df[df["Parent"] == code.value].index]

    return None

def get_parent(code, level=-1):
    df = inference_dfs[code.std]
    
    if code.value in df.index:        
        v = df.loc[code.value, "Parent"]
        l = df.loc[v, "Level"]
        
        # Level == -1 (immediate parent)
        while level != -1 and l > level:
            v = df.loc[v, "Parent"]
            l = df.loc[v, "Level"]
        
        return IndustryCode(code.std, v)

    return None

# Evaluate the highest common level (HCL)
def get_common_parent(code, std):
    # from_df = inference_dfs[code.std]
    to_df = inference_dfs[std]
    level = get_level(code)
    
    if level == 1:
        return IndustryCode(std, "")
    
    parent = get_parent(code)
    
    if parent.value in to_df.index and len(get_children(parent)) > 0:
        return IndustryCode(std, parent.value)
    
    while parent.value not in to_df.index:
        # print(std.value + ": " + c.value)
        parent = get_parent(parent)
        # print(parent.value)
    
    return IndustryCode(std, parent.value)

In [10]:
def get_detailed_code_str(code):
	df = inference_dfs[code.std].copy()
    
	if code.value in df.index:
		series = df.loc[code.value]
		series = series[series.index[~series.index.isin(["Level", "Parent", "ISIC code"])]]
		string = f"{code.std.value} Code: {series.name}\n"

		for col in series.index:
			if series[col] != "":
				text = re.sub(r"\s*\(\d+\.\d+(?:\,\s*\d+\.\d+)*\)", "", series[col])
				text = text.replace("\n", "")
				
				string += f"{col}: {text}\n"
						
		return string.strip()

	return None

def get_match_prompt(from_code, to_codes):
    from_std = from_code.std
    to_std = to_codes[0].std
    
    prompt = f"Match the given {from_std.value} code to the most similar of the following {to_std.value} codes" \
			  " using their descriptions, examples and exclusions. When returning the result as JSON, the keys" \
       		 f" for the given code should be '{from_std.value}' and for the matched code as '{to_std.value}':" \
    		 f"\n\n{get_detailed_code_str(from_code)}"
    
    for code in to_codes:
        prompt += "\n\n" + get_detailed_code_str(code)
    
    return prompt

def get_match_prompt_from_common_parent(from_code, to_std):
    common_parent = get_common_parent(from_code, to_std)
    return get_match_prompt(from_code, get_children(common_parent))

In [11]:
c = IndustryCode(IndustryStandard.SSIC, "01")
print(get_match_prompt_from_common_parent(c, IndustryStandard.WZ))

Match the given SSIC code to the most similar of the following WZ codes using their descriptions, examples and exclusions. When returning the result as JSON, the keys for the given code should be 'SSIC' and for the matched code as 'WZ':

SSIC Code: 01
Description: AGRICULTURE AND RELATED SERVICE ACTIVITIES

WZ Code: 01
Description: Crop and animal production, hunting and related service activities

WZ Code: 02
Description: Forestry and logging

WZ Code: 03
Description: Fishing and aquaculture


### 2. Obtaining similarity scores for validation

In [12]:
df = maestri_dfs[1].copy()
df = df[df["Intermediary SSIC code"].str.contains(";")]
df = df.head()

df

Unnamed: 0,Intermediary NACE code,Intermediary ISIC code,Intermediary WZ code,Intermediary SSIC code
31,610,610,06100,19201;09001
50,610,610,06100,19201;09001
98,1712,1701,17120;17230;17290;28950,17099;17010
99,1712,1701,17120;17230;17290;28950,17099;17010
100,1920,1920,20140;20130,20113;20119


In [13]:
str_to_codes = lambda text, std: [IndustryCode(std, val) for val in text.split(";")]

def get_match(from_code, text, to_std):
	to_codes = [code for code in str_to_codes(text, to_std) if get_parent(code, level=1).value == get_parent(from_code, level=1).value]

	if len(to_codes) == 1:
		return to_codes[0]

	print(f"Given {from_code.std.value} code: {from_code.value}, given {to_std.value} codes: {text}")

	prompt = get_match_prompt(from_code, to_codes)
	# print("Prompt: " + prompt)

	response = ollama.generate(model='llama3', prompt=prompt, format="json")
	response = json.loads(response["response"])
	print(f"Given {from_code.std.value} code: {from_code.value}, matched {to_std.value} code: {response[to_std.value]}")

	return IndustryCode(to_std, response[to_std.value])

In [14]:
role = new_roles[1]
nace_col = new_col(STANDARDS[0], role)

# Iterate through all standards except NACE as it is to be compared with
for std in NON_NACE_STDS:
	# Standard column, example for ISIC: 'Donor ISIC code'
	std_col = new_col(std, role)
	
	# Zip the NACE and standard columns to iterate through
	tuples = zip(df[nace_col], df[std_col])
	
	### -> INTRODUCE SIMILAR CODES HERE
	df[std_col] = [get_match(    IndustryCode(IndustryStandard.NACE, t[0]), t[1], std    ).value for t in tuples]
    
# List containing new order of columns for readability
cols = [nace_col] + [f(std, role) for std in NON_NACE_STDS for f in (new_col,)]

df = df[cols]
df

Given NACE code: 0610, given ISIC codes: 0610
Given NACE code: 0610, given ISIC codes: 0610
Given NACE code: 1712, given ISIC codes: 1701
Given NACE code: 1712, given ISIC codes: 1701
Given NACE code: 1920, given ISIC codes: 1920
Given NACE code: 0610, given WZ codes: 06100
Given NACE code: 0610, given WZ codes: 06100
Given NACE code: 1712, given WZ codes: 17120;17230;17290;28950
Given NACE code: 1712, matched WZ code: 17120
Given NACE code: 1712, given WZ codes: 17120;17230;17290;28950
Given NACE code: 1712, matched WZ code: 17120
Given NACE code: 1920, given WZ codes: 20140;20130
Given NACE code: 1920, matched WZ code: 20130
Given NACE code: 0610, given SSIC codes: 19201;09001
Given NACE code: 0610, given SSIC codes: 19201;09001
Given NACE code: 1712, given SSIC codes: 17099;17010
Given NACE code: 1712, matched SSIC code: 17010
Given NACE code: 1712, given SSIC codes: 17099;17010
Given NACE code: 1712, matched SSIC code: 17010
Given NACE code: 1920, given SSIC codes: 20113;20119
Give

Unnamed: 0,Intermediary NACE code,Intermediary ISIC code,Intermediary WZ code,Intermediary SSIC code
31,610,610,6100,9001
50,610,610,6100,9001
98,1712,1701,17120,17010
99,1712,1701,17120,17010
100,1920,1920,20130,20119


In [15]:
# # Loop through all company types
# for i in range(len(new_roles)):
#     # Role: either 'Donor', 'Intermediary' or 'Receiver'
#     role = new_roles[i]
    
#     # NACE column, example: 'Provider NACE code'
#     nace_col = new_col(STANDARDS[0], role)
    
#     # Iterate through all standards except NACE as it is to be compared with
#     for std in NON_NACE_STDS:
#         df = maestri_dfs[i]
        
#         # Standard column, example for ISIC: 'Donor ISIC code'
#         std_col = new_col(std, role)
        
#         # Zip the NACE and standard columns to iterate through
#         tuples = zip(df[nace_col], df[std_col])
        
#         ### -> INTRODUCE SIMILAR CODES HERE
        
        
    
#     # List containing new order of columns for readability
#     cols = [nace_col] + [f(std, role) for std in NON_NACE_STDS for f in (new_col,)]
    
#     # Reorder columns for readability
#     maestri_dfs[i] = maestri_dfs[i][cols]