# Showing Word2Vec encoding limitations

In [None]:
from collections import defaultdict

# RCVDB: Dictionary only needs to keep track of the unique values of a single attribute
class AttributeDictionary():
    def __init__(self, start_index=0, max_size=100) -> None:
        self.max_size = max_size
        self.start_index = start_index
        self.encodings = defaultdict(str)
        self.encodings_inv = defaultdict(int)

    def encode(self, label):
        if label in self.encodings.keys():
            return str(self.encodings[label])
        else:
            existing_keys = self.encodings_inv.keys()
            new_key = len(existing_keys) + self.start_index
            self.encodings[label] = new_key
            self.encodings_inv[new_key] = label
            return str(new_key)
        
    def decode(self, value):
        value = int(value)
        if value in self.encodings_inv.keys():
            return self.encodings_inv[value]
        else:
            return None
        
    def encoded_attributes(self):
        return [str(i) for i in map(str, self.encodings_inv.keys())]
    
    # Buffer attributes are all encoded labels that are not part of a label_value mapping
    def buffer_attributes(self):
        current_size = len(self.encodings_inv.keys())
        return [str(i) for i in range(current_size + self.start_index, self.max_size + self.start_index)]

In [None]:
all_attributes = ["red", "blue", "green", "yellow", "purple", "orange", "pink", "brown", "gray", "black"]
unknown_attributes = ["cyan", "magenta", "teal", "lavender", "beige"]

In [None]:
attrDict = AttributeDictionary(max_size=100)
for att in all_attributes:
    attrDict.encode(att)

print(all_attributes)

encoded_attributes = attrDict.encoded_attributes()
buffer_attributes = attrDict.buffer_attributes()

print(encoded_attributes)

decoded_attributes = []
for val in encoded_attributes:
    decoded_attributes.append(attrDict.decode(val[0]))

print(decoded_attributes)

print(encoded_attributes + buffer_attributes)

In [None]:
from gensim.models import Word2Vec

def convert_to_sentences(input):
    return [[str(i)] for i in input]

buffered_w2v_model = Word2Vec(sentences=convert_to_sentences(encoded_attributes + buffer_attributes), vector_size=15, window=5, min_count=1)
unbuffered_w2v_model = Word2Vec(sentences=convert_to_sentences(encoded_attributes), vector_size=15, window=5, min_count=1)

In [None]:
# Encoding the known words
buff_w2v_encodings = []
unbuff_w2v_encodings = []
for attr in encoded_attributes:
    buff_w2v_encodings.append(buffered_w2v_model.wv[attr])
    unbuff_w2v_encodings.append(unbuffered_w2v_model.wv[attr])

In [None]:
unknown_attribute = unknown_attributes[0]
enc_unkown_attribute = attrDict.encode(unknown_attribute)

print(unknown_attribute, enc_unkown_attribute)

In [None]:
# Cant deal with unknown values
try:
    unbuffered_w2v_model.wv[enc_unkown_attribute]
except KeyError as e:
    print(e, 'in the unbuffered w2v')
try:
    buffered_w2v_model.wv[unknown_attribute]
except KeyError as e: 
    print(e, 'in the buffered (and unbufferd) w2v')

# But can deal with the encoded unknown value if buffered
try:
    w2v_unknown_attribute = buffered_w2v_model.wv[enc_unkown_attribute]
    print(enc_unkown_attribute, 'is present in the buffered w2v')
    print(w2v_unknown_attribute)
except KeyError as e:
    print(e)

# Find the most similar words to the unknown vector
similar_words = buffered_w2v_model.wv.similar_by_vector(w2v_unknown_attribute)

# Note that the 10 (cyan) has a 100% match while it is not in the original dataset
print(similar_words)

best_match = similar_words[0][0]
print(attrDict.decode(best_match))

# Label Encoder has the same problem
Thus a dictionary keeping track of the attributes is also needed

In [None]:
from sklearn.preprocessing import LabelEncoder

encoder = LabelEncoder()
all_attributes_encoded = encoder.fit_transform(all_attributes)

print(all_attributes_encoded)

In [None]:
# Cant deal with unknown values
try:
    encoder.transform(unknown_attributes)
except ValueError as e:
    print(e)

# Word2Vec Wrapper

In [None]:
import numpy as np
import copy

class ProcessWord2Vec():
    def __init__(self, training_sentances, vector_size=50, window=5, min_count=1, workers=4, attr_dicts=None, debug=False) -> None:
        self.debug = debug
        self.attr_dicts = attr_dicts

        if self.attr_dicts is not None:
            training_sentances = self._encode_training_sentences(copy.deepcopy(training_sentances))

        self.w2v_model = Word2Vec(sentences=training_sentances, vector_size=vector_size, window=window, min_count=min_count, workers=workers)

    def _encode_training_sentences(self, training_sentances):
        if self.debug: print(f"Training Sentances: {training_sentances}")

        for i, sentance in enumerate(training_sentances):
            for j, word in enumerate(sentance):
                encoded_word = self.attr_dicts[j].encode(word)
                training_sentances[i][j] = encoded_word

        for attr_dict in self.attr_dicts:
            attr_dict:AttributeDictionary
            training_sentances += ProcessWord2Vec.convert_to_sentences(attr_dict.buffer_attributes())
        training_sentances

        if self.debug: print(f"Encoded Training Sentances: {training_sentances}")
        return training_sentances         

    # Function to get the Word2Vec vector for an attribute
    def _get_attr_vector(self, attr_index, attr):
        if self.attr_dicts is not None:
            mapped_attr = self.attr_dicts[attr_index].encode(attr)
            
            if self.debug: print(f"\t\tMapped {attr} to {mapped_attr}")
        else:
            mapped_attr = attr
        return self.w2v_model.wv[mapped_attr]

    # Function to encode an event by averaging its attribute vectors
    def _encode_event(self, event):
        if self.debug: print(f"\tEncoding Event: {event}")
        attribute_vectors = np.array([self._get_attr_vector(i, attr) for i, attr in enumerate(event)])
        event_vector = np.mean(attribute_vectors, axis=0)
        return event_vector

    # Function to encode a trace by concatenating its event vectors
    def encode_trace(self, trace):
        if self.debug: print(f"Encoding Trace: {trace}")
        event_vectors = [self._encode_event(event) for event in trace]
        trace_vector = np.concatenate(event_vectors, axis=0)
        return trace_vector      
    
    @staticmethod
    def convert_to_sentences(input):
        return [[str(i)] for i in input]

In [None]:
import random

all_attributes = ["red", "blue", "green", "yellow", "purple", "orange", "pink", "brown", "gray", "black"]

# Create traces with random selection of attributes
length = 10
size = 100
attribute_traces = [random.sample(all_attributes, length) for _ in range(size)]
unique_attributes = list(set(attribute for sublist in attribute_traces for attribute in sublist))
print(attribute_traces)

# Convert the traces labels to integers
attrDict = AttributeDictionary(max_size=100)
for i, trace in enumerate(attribute_traces):
    for j, attribute in enumerate(trace):
        attribute_traces[i][j] = attrDict.encode(attribute)
print(attribute_traces)

# Append the buffer to the traces
# If some pre-existing data is available it can be used to improve the w2v training, however, in theory no pre-training is nessessary.
# In the latter case only the buffer is used exclusively
def convert_to_sentences(input):
    return [[str(i)] for i in input]
sentances_pre_training = attribute_traces + convert_to_sentences(attrDict.buffer_attributes())
print(sentances_pre_training)

no_pretrain_attrDict = AttributeDictionary(max_size=100)
scentences_only_buffer = convert_to_sentences(no_pretrain_attrDict.buffer_attributes())
print(scentences_only_buffer)

# The w2v model can then be trained on any existing traces and the generic buffer
process_w2v_model = ProcessWord2Vec(training_sentances=sentances_pre_training)
process_w2v_model_only_buffer = ProcessWord2Vec(training_sentances=scentences_only_buffer)

## Visualizing the encoding

In [None]:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

def reduce_embedding_space(w2v_model):
    words = w2v_model.wv.index_to_key
    word_vectors = np.array([w2v_model.wv[word] for word in words])

    pca = PCA(n_components=2)
    word_vectors_2d = pca.fit_transform(word_vectors)

    # tsne = TSNE(n_components=2, random_state=2024)
    # word_vectors_2d = tsne.fit_transform(word_vectors)

    return words, word_vectors_2d

words, word_vectors_2d = reduce_embedding_space(process_w2v_model.w2v_model)
words_only_buffer, word_vectors_2d_only_buffer = reduce_embedding_space(process_w2v_model_only_buffer.w2v_model)

In [None]:
# Plot each category with different colors
plt.scatter(word_vectors_2d[:, 0], word_vectors_2d[:, 1], color='red', label='With Pre-Train Traces')
plt.scatter(word_vectors_2d_only_buffer[:, 0], word_vectors_2d_only_buffer[:, 1], color='blue', label='Only Buffer')

# Annotate the points with the corresponding words
for i, word in enumerate(words):
    if int(word) < 10:
        plt.annotate(word, xy=(word_vectors_2d[i, 0], word_vectors_2d[i, 1]), fontsize=12, color='green')

for i, word in enumerate(words_only_buffer):
    if int(word) < 10:
        plt.annotate(word, xy=(word_vectors_2d_only_buffer[i, 0], word_vectors_2d_only_buffer[i, 1]), fontsize=12, color='black')

# Set the title and labels
plt.title('Word2Vec Word Embeddings Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.grid()
plt.legend()
plt.show()

# Testing Wrapper Solution

In [None]:
all_color_attributes = ["red", "blue", "green", "yellow", "purple", "orange", "pink", "brown", "gray", "black"]
all_shape_attributes = ["circle", "square", "triangle", "rectangle", "oval", "hexagon", "star", "pentagon", "rhombus", "octagon"]
unknown_color_attributes = ["cyan", "magenta", "teal", "lavender", "beige"]
unknown_shape_attributes = ["trapezoid", "parallelogram", "crescent", "heart", "kite"]


length = 3
size = 5

color_traces = [random.sample(all_color_attributes, length) for _ in range(size)]
shape_traces = [random.sample(all_shape_attributes, length) for _ in range(size)]
combined_traces = [[list(pair) for pair in zip(shape_list, color_list)] for shape_list, color_list in zip(shape_traces, color_traces)]
flattened_combined_traces = [pair for sublist in combined_traces for pair in sublist]

unknown_color_traces = [random.sample(unknown_color_attributes, length) for _ in range(size)]
unknown_shape_traces = [random.sample(unknown_shape_attributes, length) for _ in range(size)]
unknown_combined_traces = [[list(pair) for pair in zip(shape_list, color_list)] for shape_list, color_list in zip(unknown_shape_traces, unknown_color_traces)]

print(combined_traces)
print(flattened_combined_traces)
print(unknown_combined_traces)

In [None]:
color_attr_dict = AttributeDictionary(max_size=20)
shape_attr_dict = AttributeDictionary(start_index=color_attr_dict.max_size, max_size=20)
wrapped_process_w2v_model = ProcessWord2Vec(training_sentances=flattened_combined_traces, attr_dicts=[color_attr_dict, shape_attr_dict], debug=True)

print(combined_traces)
print(flattened_combined_traces)

encoded_traces = []
for trace in unknown_combined_traces:
    encoded_traces.append(wrapped_process_w2v_model.encode_trace(trace))


In [None]:
words, word_vectors_2d = reduce_embedding_space(wrapped_process_w2v_model.w2v_model)

# Plot each category with different colors
plt.scatter(word_vectors_2d[:, 0], word_vectors_2d[:, 1], color='red', label='With Pre-Train Traces')

# Annotate the points with the corresponding words
for i, word in enumerate(words):
    if int(word) < 8:
        plt.annotate(word, xy=(word_vectors_2d[i, 0], word_vectors_2d[i, 1]), fontsize=12, color='black')

# Set the title and labels
plt.title('Word2Vec Word Embeddings Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.grid()
plt.legend()
plt.show()