In [1]:
# Required packages
import torch
import re
import csv

In [2]:
# Check if GPU is configured
print(torch.rand(5, 3))

tensor([[0.5268, 0.7979, 0.9768],
        [0.2683, 0.2382, 0.5579],
        [0.0706, 0.3281, 0.6826],
        [0.8678, 0.5696, 0.0609],
        [0.2655, 0.9106, 0.5374]])


In [3]:
model_path = "models/ner_rasa_vers_v3"
test_path = "/DEVEL/code/data/inventory.csv"


#### Load Inventory data set

In [4]:
raw_text = ""
with open(test_path) as csvfile:
    raw_csv = csv.reader(csvfile, delimiter=',')
    for row in raw_csv:
        raw_text = raw_text + "\n" + row[1]

# remove last empty line
input_text = raw_text.split("\n")[1:]

In [5]:

def get_tokens_with_entities(raw_text: str):
    # split the text by spaces only if the space does not occur between square brackets
    # we do not want to split "multi-word" entity value yet
    raw_tokens = re.split(r"\s(?![^\[]*\])", raw_text)

    # a regex for matching the annotation according to our notation [entity_value](entity_name)
    entity_value_pattern = r"\[(?P<value>.+?)\]\((?P<entity>.+?)\)"
    entity_value_pattern_compiled = re.compile(entity_value_pattern, flags=re.I|re.M)

    tokens_with_entities = []

    for raw_token in raw_tokens:
        match = entity_value_pattern_compiled.match(raw_token)
        if match:
            raw_entity_name, raw_entity_value = match.group("entity"), match.group("value")

            # we prefix the name of entity differently
            # B- indicates beginning of an entity
            # I- indicates the token is not a new entity itself but rather a part of existing one
            for i, raw_entity_token in enumerate(re.split("\s", raw_entity_value)):
                entity_prefix = "B" if i == 0 else "I"
                entity_name = f"{entity_prefix}-{raw_entity_name}"
                tokens_with_entities.append((raw_entity_token, entity_name))
        else:
            tokens_with_entities.append((raw_token, "O"))

    return tokens_with_entities

In [6]:
print(get_tokens_with_entities("adobe [acrobat](cpe_product) x (10.1)"))
print(get_tokens_with_entities("red hat [wildfly core](cpe_product) 2.0.0 alpha 2"))
print(get_tokens_with_entities(input_text[0]))
print(get_tokens_with_entities(input_text[1]))


[('adobe', 'O'), ('acrobat', 'B-cpe_product'), ('x', 'O'), ('(10.1)', 'O')]
[('red', 'O'), ('hat', 'O'), ('wildfly', 'B-cpe_product'), ('core', 'I-cpe_product'), ('2.0.0', 'O'), ('alpha', 'O'), ('2', 'O')]
[('microsoft', 'O'), ('corporation', 'O'), ('visualÂ', 'O'), ('studio', 'O'), ('community', 'O'), ('2022', 'O')]
[('finalwire', 'O'), ('ltd.', 'O'), ('aida64', 'O'), ('extreme', 'O'), ('v6.60', 'O')]


In [7]:
class NERDataMaker:
    def __init__(self, texts):
        self.unique_entities = []
        self.processed_texts = []

        temp_processed_texts = []
        for text in texts:
            tokens_with_entities = get_tokens_with_entities(text)
            for _, ent in tokens_with_entities:
                if ent not in self.unique_entities:
                    self.unique_entities.append(ent)
            temp_processed_texts.append(tokens_with_entities)

        self.unique_entities.sort(key=lambda ent: ent if ent != "O" else "")

        for tokens_with_entities in temp_processed_texts:
            self.processed_texts.append([(t, self.unique_entities.index(ent)) for t, ent in tokens_with_entities])

    @property
    def id2label(self):
        return dict(enumerate(self.unique_entities))

    @property
    def label2id(self):
        return {v:k for k, v in self.id2label.items()}

    def __len__(self):
        return len(self.processed_texts)

    def __getitem__(self, idx):
        def _process_tokens_for_one_text(id, tokens_with_encoded_entities):
            ner_tags = []
            tokens = []
            for t, ent in tokens_with_encoded_entities:
                ner_tags.append(ent)
                tokens.append(t)

            return {
                "id": id,
                "ner_tags": ner_tags,
                "tokens": tokens
            }

        tokens_with_encoded_entities = self.processed_texts[idx]
        if isinstance(idx, int):
            return _process_tokens_for_one_text(idx, tokens_with_encoded_entities)
        else:
            return [_process_tokens_for_one_text(i+idx.start, tee) for i, tee in enumerate(tokens_with_encoded_entities)]

    def as_hf_dataset(self, tokenizer):
        from datasets import Dataset, Features, Value, ClassLabel, Sequence
        def tokenize_and_align_labels(examples):
            tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)

            labels = []
            for i, label in enumerate(examples[f"ner_tags"]):
                word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
                previous_word_idx = None
                label_ids = []
                for word_idx in word_ids:  # Set the special tokens to -100.
                    if word_idx is None:
                        label_ids.append(-100)
                    elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                        label_ids.append(label[word_idx])
                    else:
                        label_ids.append(-100)
                    previous_word_idx = word_idx
                labels.append(label_ids)

            tokenized_inputs["labels"] = labels
            return tokenized_inputs

        ids, ner_tags, tokens = [], [], []
        for i, pt in enumerate(self.processed_texts):
            ids.append(i)
            pt_tokens,pt_tags = list(zip(*pt))
            ner_tags.append(pt_tags)
            tokens.append(pt_tokens)
        data = {
            "id": ids,
            "ner_tags": ner_tags,
            "tokens": tokens
        }
        features = Features({
            "tokens": Sequence(Value("string")),
            "ner_tags": Sequence(ClassLabel(names=dm.unique_entities)),
            "id": Value("int32")
        })
        ds = Dataset.from_dict(data, features)
        tokenized_ds = ds.map(tokenize_and_align_labels, batched=True)
        return tokenized_ds

In [8]:
# Create Training NER Data Object
dm = NERDataMaker(input_text)
print(f"total examples = {len(dm)}")
print(dm[0:3])

total examples = 295
[{'id': 0, 'ner_tags': [0, 0, 0, 0, 0, 0], 'tokens': ['microsoft', 'corporation', 'visualÂ', 'studio', 'community', '2022']}, {'id': 1, 'ner_tags': [0, 0, 0, 0, 0], 'tokens': ['finalwire', 'ltd.', 'aida64', 'extreme', 'v6.60']}, {'id': 2, 'ner_tags': [0, 0, 0, 0, 0], 'tokens': ['glarysoft', 'ltd', 'glary', 'utilities', '5.187']}]


In [9]:
from transformers import AutoConfig, AutoTokenizer, AutoModelForTokenClassification
# model_config = AutoConfig.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path + "/tokenizer")
# model = AutoModelForTokenClassification.from_pretrained("models/ner_rasa_vpv_v2/tokenizer", num_labels=len(dm.unique_entities), id2label=dm.id2label, label2id=dm.label2id)
model = AutoModelForTokenClassification.from_pretrained(model_path)
# id_to_label = {v: str(k) for k, v in model.config.label2id.items()}

In [10]:
input_ds = dm.as_hf_dataset(tokenizer=tokenizer)


  0%|          | 0/1 [00:00<?, ?ba/s]

# INFERENCE

In [11]:
from transformers import pipeline
pipe = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple", device=0) # pass device=0 if using gpu
pipe("""softing uagates 1.73 for wordpress""")


[{'entity_group': 'cpe_version',
  'score': 0.99995387,
  'word': '1',
  'start': 16,
  'end': 17}]

In [12]:

# pipe("""Microsoft Visual C++ 2013 Redistributable (x64) - 12.0.30501""")
# pipe("""microsoft visual c++ 2013 redistributable (x64) - 12.0.30501""")

# pipe("""google chrome 32.0.1670.5""")
# pipe("""draw.io 2.6.3 for confluence""")


# pipe("""progress sitefinity 9.2""")
# pipe("bitnami containers 7.30.1-debian-10-r40 for laravel")

# pipe("cool house technology ewelink 4.3.0 for android")
# pipe("fastball productions fastball 2.5.3 for joomla")