# Importing packages

In [2]:
''' GENERAL '''
import pandas as pd
import numpy as np
import re
from collections.abc import Iterable
from itertools import chain
from collections import Counter

from tqdm.notebook import tqdm # Notebook friendly progression bar
tqdm.pandas()

''' EMBEDDINGS '''
from sentence_transformers import SentenceTransformer, util
from tensorflow.keras.preprocessing.text import Tokenizer #GloVe
from sklearn.feature_extraction.text import TfidfVectorizer #GloVe

from nltk.stem import WordNetLemmatizer, SnowballStemmer #GloVe
# import nltk
# nltk.download('wordnet')

2024-07-04 09:03:52.288665: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/niekprivate/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

# Preparation

## ISCO

In [4]:
# Read file
isco = pd.read_excel('isco.xlsx', dtype={'ISCO 08 Code': 'str'})
# Rename columns
isco.rename(columns={'Level': 'isco_level', 'ISCO 08 Code': 'isco_code', 'Title EN': 'isco_title', 'Tasks include': 'isco_tasks', 'Definition': 'isco_definition'}, inplace=True)
# Drop unnecessary columns
isco = isco.iloc[:, :5]
# Keep only level 3 ISCO codes
isco = isco.loc[isco['isco_level'] == 3]

# Add '0'/'00' to level 1/2 isco codes ('21' --> '210', '8' --> '800')
isco['isco_code'] = np.select(
    condlist=[
        isco['isco_level'] == 1,
        isco['isco_level'] == 2
    ],
    choicelist=[
        isco['isco_code'].apply(lambda x: x + '00'),
        isco['isco_code'].apply(lambda x: x + '0'),
    ],
    default=isco['isco_code']
)

# Print amount of missing values for tasks and definitions
amount_na = len(isco[isco['isco_tasks'].isna()])
len_isco = len(isco)
print('Contains missing tasks:', isco['isco_tasks'].isna().any(), f'(# missing values: {amount_na}/{len_isco}, {round((amount_na/len_isco)*100, 2)}%)')
print('Contains missing definition:', isco['isco_definition'].isna().any())

# Fill missing tasks with empty string
isco['isco_tasks'] = isco['isco_tasks'].fillna('')
# Combine definition and task strings
isco['combined_text'] = isco['isco_definition'] + ' ' + isco['isco_tasks']

isco.reset_index(drop=True, inplace=True)

Contains missing tasks: True (# missing values: 3/130, 2.31%)
Contains missing definition: False


In [3]:
# Amount of ISCO strings with length > max seq length SBERT model
definition_length = isco['isco_definition'].apply(lambda x: len(x.split()))
tasks_length = isco['isco_tasks'].apply(lambda x: len(x.split()))

print('Definitions with length > seq length (384):', (definition_length > 384).any())
print('Tasks with length > seq length (384):', (tasks_length > 384).any())


NameError: name 'isco' is not defined

## Patents

In [None]:
# Read file
patents = pd.read_csv('data/patents/sustainability_patents.tsv', sep='\t')
# Patent technology
patent_tech = pd.read_csv('data/patents/patent_tech.tsv', sep='\t')

In [None]:
def clean_claim(x):
    # html tags
    x = re.sub('<.*?>', '', x)    
    # arrows and dashes
    x = re.sub('-+>?', '', x)
    # everything between '( )'
    x = re.sub('\(.*?\)', '', x)
    # a. at start
    x = re.sub('^\w+\.', '', x)
    # replace multiple whitespaces with one
    x = re.sub('\s{2,}', '', x)
    # strip whitespaces at start and end
    x = x.strip()

    return x

In [None]:
def split_claim(x, max_seq_length):
    # Returns True if any element in list is instance list
    def contains_list(lst):
        return any(isinstance(element, list) for element in lst)
    # Flattens list
    def flatten(lst):
        for item in lst:
            if isinstance(item, Iterable) and not isinstance(item, (str, bytes)):
                yield from flatten(item)
            else:
                yield item

    # Regex for seperating claim on HTML tag
    sep_re = re.compile('<claim-text>')
    claims = re.split(sep_re, x)

    # List of claim lengths
    claim_lengths = [len(c.split()) for c in claims]
    # List with bools for lengt > max seq length
    long_claims = [idx for idx in range(len(claim_lengths)) if claim_lengths[idx] > max_seq_length]
    # Different Regex for seperating on '<br>' or period or colon
    br_re = re.compile('<br\s?\/?>')
    per_re = re.compile('(?<!\d)\.(?!\d)')
    col_re = re.compile('(?<=[A-z]);')

    # Iterativaly split claims if claim still too long
    if long_claims:
        for c_idx in long_claims:
            claim = claims[c_idx]
            claim_list = [claim]
            claim_list_bool = [True]

            br_tries = 0
            per_tries = 0
            col_tries = 0

            maxsplit = 1
            while br_tries < 6 and True in claim_list_bool:
                claim_list = re.split(br_re, claim, maxsplit=maxsplit)
                claim_list_bool = [len(clean_claim(c).split()) > max_seq_length for c in claim_list]
                max_lenght_br = np.max([len(clean_claim(c).split()) for c in claim_list])
                maxsplit += 1
                br_tries += 1

            maxsplit = 1
            while per_tries < 6 and True in claim_list_bool:
                claim_list = re.split(per_re, claim, maxsplit=maxsplit)
                claim_list_bool = [len(clean_claim(c).split()) > max_seq_length for c in claim_list]
                max_lenght_per = np.max([len(clean_claim(c).split()) for c in claim_list])
                maxsplit += 1
                per_tries += 1

            maxsplit = 1
            while col_tries < 6 and True in claim_list_bool:
                claim_list = re.split(col_re, claim, maxsplit=maxsplit)
                claim_list_bool = [len(clean_claim(c).split()) > max_seq_length for c in claim_list]
                max_lenght_col = np.max([len(clean_claim(c).split()) for c in claim_list])
                maxsplit += 1
                col_tries += 1

            if True in claim_list_bool:
                if max_lenght_br < max_lenght_col and max_lenght_br < max_lenght_per:
                    claim_list = re.split(br_re, claim)
                elif max_lenght_per < max_lenght_br and max_lenght_per < max_lenght_col:
                    claim_list = re.split(br_re, claim)
                elif max_lenght_col < max_lenght_per and max_lenght_col < max_lenght_br:
                    claim_list = re.split(br_re, claim)
                
            claims[c_idx] = claim_list

    if contains_list(claims):
        claims = list(flatten(claims))

    return claims # Return list of split claims

In [None]:
''' Filtering on patent claims '''

patent_claims = patents[patents['text_type'] == 'CLAIM'].copy()
patent_claims.reset_index(inplace=True, drop=True)

In [None]:
''' Cleaning and splitting claims '''

# Clean claim texts
print('Cleaning claims...')
patent_claims['clean_text'] = patent_claims['text'].progress_apply(clean_claim)

# Splitting claims
print('\nSplitting claims...')
patent_claims['split_text_mpnet'] = patent_claims['text'].progress_apply(lambda x: split_claim(x, 384))

# Quick function for cleaning seperate claims in list
def clean_split_claim(x): return [clean_claim(c) for c in x if not re.search('^\s*$', clean_claim(c))]

# Cleaning split claims
print('\nCleaning split claims...')
patent_claims['split_text_mpnet'] = patent_claims['split_text_mpnet'].progress_apply(clean_split_claim)

# subset columns
patent_claims = patent_claims[['pub_nbr', 'app_year', 'text', 'clean_text', 'split_text_mpnet']]
# Remove empty claims
patent_claims = patent_claims[patent_claims['split_text_mpnet'].str.len() > 0]

In [None]:
''' Printing patent info '''

nr_patents = len(patents['pub_nbr'].unique())
print('N =', nr_patents)

grouped = patents.groupby('pub_nbr')['text_type']

nr_na_vc = grouped.apply(lambda x: x.isna().any()).value_counts()
print(f'\nAmount of patents with missing data: {nr_na_vc.iloc[1]}/{nr_patents} ({round(nr_na_vc.iloc[1]/nr_patents, 4)*100}%)')

text_type_vc = grouped.apply(lambda x: 'CLAIM' in x.values).value_counts()
print(f'\nPatents containing a claim section: {text_type_vc.iloc[0]}/{nr_patents} ({round(text_type_vc.iloc[0]/nr_patents, 2)*100}%)')

# Embeddings

## mpnet

In [None]:
# Load model
model_mpnet = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
print('Max seq length mpnet:', model_mpnet.max_seq_length) #384

# ISCO embeddings
isco_embeddings_mpnet = isco['combined_text'].progress_apply(model_mpnet.encode)

# Sustainability patents
patent_claims['embeddings_mpnet'] = patent_claims['split_text_mpnet'].progress_apply(model_mpnet.encode)
# Averaging claim embeddings
patent_claims['embeddings_mpnet_mean'] = patent_claims['embeddings_mpnet'].apply(lambda x: x.mean(axis=0))

## PaECTER

In [None]:
# Load model
model_paecter = SentenceTransformer('mpi-inno-comp/paecter')
print('Max seq length mpnet:', model_paecter.max_seq_length) #512

# ISCO embeddings
isco_embeddings_paecter = model_paecter.encode(isco['combined_text'], show_progress_bar=True)

# Sustainability patents
patent_claims['embeddings_paecter'] = patent_claims['split_text_paecter'].progress_apply(model_mpnet.encode)
# Averaging claim embeddings
patent_claims['embeddings_paecter_mean'] = patent_claims['embeddings_paecter'].apply(lambda x: x.mean(axis=0))

## GloVe

In [None]:
lemmatizer = WordNetLemmatizer()
stemmer = SnowballStemmer("english")

In [None]:
def tokenize_doc(document):
    # Lowercase
    document = document.lower()
    # Create vocabulary
    vocab = document.split()

    # Lemmatizing and stemming
    vocab = [lemmatizer.lemmatize(w) for w in vocab]
    vocab = [stemmer.stem(w) for w in vocab]

    # Tokenizing
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(vocab)

    return tokenizer

In [None]:
''' Create GloVe embeddings, with optional TF-IDF weighting '''
def create_glove_embeddings(document, dim, glove_embeddings, tfidf_df=None, feature_names= None, i=None):
    tokenized_doc = tokenize_doc(document)

    word_index = tokenized_doc.word_index

    embedding_list = []

    for word in word_index:
        if word not in glove_embeddings:
            continue

        if not tfidf_df is None:
            if word not in feature_names:
                continue

            tfidf_value = tfidf_df.loc[i, word]
            if tfidf_value == 0:
                raise Exception('tfidf value cannot be 0')

            embedding_list.append(np.array(
                vector, dtype=np.float32) * tfidf_value)
        else:
            embedding_list.append(np.array(
                vector, dtype=np.float32))

    embedding_matrix = np.array(embedding_list)
    document_embedding = np.mean(embedding_matrix, axis=0)

    return document_embedding

In [None]:
''' Creating patent embeddings '''

dim = 300

# Create corpus
corpus = patent_claims['clean_text']
# Tokenize
corpus_clean = [' '.join(tokenize_doc(doc).index_word.values()) for doc in corpus]
corpus_size = len(corpus_clean)

corpus_tokens = set(chain(*([doc.split() for doc in corpus_clean])))

# Initiate TF-IDF instance
tfidf = TfidfVectorizer()
tfidf_fit = tfidf.fit_transform(corpus_clean)
feature_names = tfidf.get_feature_names_out()
# Create df with feature names and corresponding tf-idf values
tfidf_df = pd.DataFrame(tfidf_fit.todense(), index=range(corpus_size), columns=feature_names)

# Tokens in corpus that are not in tfidf
token_diff = corpus_tokens - set(feature_names)

glove_embeddings = {}

with open('glove/glove.6B.300d.txt', 'r') as glove_file:
    # Read word embeddings from file
    while True:
        line = glove_file.readline()

        if not line:
            break

        word, *vector = line.split()

        if word in corpus_tokens:
            glove_embeddings[word] = vector

# NOT TFIDF WEIGHTED
embeddings_glove = np.zeros((corpus_size, dim))

for i, doc in enumerate(tqdm(corpus_clean)):
    doc_embedding = create_glove_embeddings(doc, dim, glove_embeddings)
    embeddings_glove[i] = doc_embedding
patent_claims['embeddings_glove'] = [emb for emb in embeddings_glove]

# TFIDF WEIGHTED
embeddings_glove_tfidf = np.zeros((corpus_size, dim))

for i, doc in enumerate(tqdm(corpus_clean)):
    doc_embedding_tfidf = create_glove_embeddings(doc, dim, glove_embeddings, tfidf_df, feature_names, i)
    embeddings_glove_tfidf[i] = doc_embedding_tfidf
patent_claims['embeddings_glove_tfidf'] = [emb for emb in embeddings_glove_tfidf]

In [None]:
''' Creating ISCO embeddings '''

dim = 300

corpus_isco = isco['combined_text']
corpus_isco_clean = [' '.join(tokenize_doc(doc).index_word.values()) for doc in corpus_isco]
corpus_isco_size = len(corpus_isco_clean)

corpus_isco_tokens = set(chain(*([doc.split() for doc in corpus_isco_clean])))

tfidf_isco = TfidfVectorizer()
tfidf_isco_fit = tfidf_isco.fit_transform(corpus_isco_clean)
feature_names_isco = tfidf_isco.get_feature_names_out()
tfidf_df_isco = pd.DataFrame(tfidf_isco_fit.todense(), index=range(corpus_isco_size), columns=feature_names_isco)

token_diff_isco = corpus_isco_tokens - set(feature_names_isco)

glove_embeddings_isco = {}

with open('glove/glove.6B.300d.txt', 'r') as glove_file:
    while True:
        line = glove_file.readline()

        if not line:
            break

        word, *vector = line.split()

        if word in corpus_isco_tokens:
            glove_embeddings_isco[word] = vector

isco_embeddings_glove = np.zeros((corpus_isco_size, dim))

for i, doc in enumerate(tqdm(corpus_isco_clean)):
    doc_embedding_isco = create_glove_embeddings(doc, dim, glove_embeddings_isco)
    isco_embeddings_glove[i] = doc_embedding_isco

# TFIDF WEIGHTED
isco_embeddings_glove_tfidf = np.zeros((corpus_isco_size, dim))

for i, doc in enumerate(tqdm(corpus_isco_clean)):
    doc_embedding_isco_tfidf = create_glove_embeddings(doc, dim, glove_embeddings_isco, tfidf_df_isco, feature_names_isco, i)
    isco_embeddings_glove_tfidf[i] = doc_embedding_isco_tfidf

# Most similar occupations

In [None]:
def most_sim_occupations_split(claim_embeddings, occupation_embeddings):
    # Create sim matrix of list of claim embeddings with every ISCO embedding
    sim_matrix = util.cos_sim(claim_embeddings, occupation_embeddings)
    sim_matrix = pd.DataFrame(sim_matrix)

    codes = []

    for i in range(len(sim_matrix)):
        # Retrieve index of occupation with max similarity
        idx_max = sim_matrix.iloc[i].idxmax()
        # Match index with ISCO code
        code = isco.iloc[idx_max]['isco_code']
        codes.append(code)

    # Create counter for list of codes
    code_counter = Counter(codes)
    # Retrieve most common code
    code = code_counter.most_common(1)[0][0]

    return code

def most_sim_occupations(claim_embeddings, occupancy_embeddings):
    # Same but for claims not in a list
    sim_matrix = util.cos_sim(claim_embeddings, occupancy_embeddings)
    sim_matrix = pd.DataFrame(sim_matrix)

    sim_occupancies = []

    for i in range(len(sim_matrix)):
        idx_max = sim_matrix.iloc[i].idxmax()

        code = isco.iloc[idx_max]['isco_code']

        sim_occupancies.append(code)

    return pd.Series(sim_occupancies)

In [None]:
# mpnet
patent_claims['sim_occ_mpnet'] = patent_claims['embeddings_mpnet'].progress_apply(lambda x: most_sim_occupations_split(x, isco_embeddings_mpnet))
patent_claims['sim_occ_mpnet_mean'] = most_sim_occupations(patent_claims['embeddings_mpnet_mean'], isco_embeddings_mpnet)

In [None]:
# PaECTER
patent_claims['sim_occ_paecter'] = patent_claims['embeddings_paecter'].apply(lambda x: most_sim_occupations_split(x, isco_embeddings_paecter))
patent_claims['sim_occ_paecter_mean'] = most_sim_occupations(patent_claims['embeddings_paecter_mean'], isco_embeddings_paecter)

In [None]:
# GloVe
patent_claims['sim_occ_glove'] = most_sim_occupations(patent_claims['embeddings_glove'], isco_embeddings_glove)
patent_claims['sim_occ_glove_tfidf'] = most_sim_occupations(patent_claims['embeddings_glove_tfidf'], isco_embeddings_glove_tfidf)

In [None]:
# Writing file for manual validation
with open('similar_occupancies_check.txt', 'w') as f:
    for pub_nbr in patent_claims['pub_nbr']:
        pub_title = patents.loc[(patents['pub_nbr'] == pub_nbr) & (patents['text_type'] == 'TITLE'), 'text'].iloc[
            0].lower().capitalize()
        pub_claim = patent_claims.loc[patent_claims['pub_nbr'] == pub_nbr, 'split_text'].iloc[0]

        pub_claims = '.\n\n'.join(pub_claim)

        sim_occupations_mpnet = patent_claims.loc[patent_claims['pub_nbr'] == pub_nbr, 'sim_occ_mpnet'].iloc[0][0]
        sim_occupations_mpnet_mean = patent_claims.loc[patent_claims['pub_nbr'] == pub_nbr, 'sim_occ_mpnet_mean'].iloc[0]

        sim_occupations_paecter = patent_claims.loc[patent_claims['pub_nbr'] == pub_nbr, 'sim_occ_paecter'].iloc[0][0]
        sim_occupations_paecter_mean = patent_claims.loc[patent_claims['pub_nbr'] == pub_nbr, 'sim_occ_paecter_mean'].iloc[0]

        sim_occupations_glove = patent_claims.loc[patent_claims['pub_nbr'] == pub_nbr, 'sim_occ_glove'].iloc[0]
        sim_occupations_glove_tfidf = patent_claims.loc[patent_claims['pub_nbr'] == pub_nbr, 'sim_occ_glove_tfidf'].iloc[0]

        f.write(
            f'''TITLE:
{pub_title} (patent number: {pub_nbr})

CLAIMS:
{pub_claims}

SIMILAR OCCUPANCIES:

mpnet:
{sim_occupations_mpnet}
mpnet (mean):
{sim_occupations_mpnet_mean}

PaECTER:
{sim_occupations_paecter}
PaECTER (mean):
{sim_occupations_paecter_mean}

GLOVE:
{sim_occupations_glove}
GLOVE (tf-idf):
{sim_occupations_glove_tfidf}

-----------------------------------------------------------------------------------------------------------------------

''')