In [None]:
from graph import Graph
from part import Part

# Util libraries
import pickle
from typing import List, Set, Dict, Tuple

# ML libraries
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset

In [None]:
with open('data/graphs.dat', 'rb') as file:
    all_graphs: List[Graph] = pickle.load(file)
    X_train, X_temp, y_train, y_temp = train_test_split(list(map(lambda g: g.get_parts(), all_graphs)), all_graphs, test_size=0.3, random_state=0)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=0)


print(X_train[0])
print(y_train[0])
print(len(y_train))
print(y_train[0].get_edges())

In [None]:
# Prepare the data for embedding:
# 1. Map parts to indices.
# 2. Create a training set for the embedding. Each training sample will consist of a unique part idx and a unique part neighbor idx

# returns a dictionary of all part ids in the training set and the size of the dictionary
# the value determines the index
def map_parts_to_index(X_train: List[Set[Part]]) -> (Dict[Part, int], int):
    parts_list = [part for parts in X_train for part in parts]
    parts_dict = {}
    for i, part in enumerate(parts_list):
        if part.get_part_id() not in parts_dict:
            parts_dict[part.get_part_id()] = i
    return parts_dict, len(parts_dict)

# returns a list of tuples. Each tuple contains an idx that represents a part and an idx that represents one of its neighbors
def create_embedding_training_set(X_train: List[Set[Part]], graphs: List[Graph]) -> List[Tuple[int, int]]:
    training_set = []
    mapped_parts, _ = map_parts_to_index(X_train)
    # iterate through each graph in the training set
    for graph in graphs:
        # iterate through all edges in a graph. One part is the key, its neighbors are the values
        for node in graph.get_edges():
            part = node.get_part()
            part_index = mapped_parts[part.get_part_id()]
            # iterate through all neighbors af a part. 
            for neighbor_node in graph.get_edges()[node]:
                neighbor_part = neighbor_node.get_part()
                neighbor_part_index = mapped_parts[neighbor_part.get_part_id()]
                training_set.append((part_index, neighbor_part_index))

    return training_set




In [None]:
# Create neural network to learn the embeddings

# Train on an efficient device if possible
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")


# Define the network architecture
class EmbeddingNetwork(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int):
        super(EmbeddingNetwork, self).__init__()
        self.embeddings = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.output_layer = nn.Linear(embedding_dim, vocab_size)

    
    def forward(self, x):
        embedded = self.embeddings(x)
        out = self.output_layer(embedded)
        # Note: No softmax here when using nn.CrossEntropyLoss
        return out
    

# Define the model
vocab_size = map_parts_to_index(X_train)[1] # Number of different parts in the training set
embedding_dim = 10 # Number of embedding dimension
model = EmbeddingNetwork(vocab_size, embedding_dim)
model.to(device)

print("Vocab size: ", vocab_size)
print("Embedding Dimensions: ", embedding_dim)

print("Model: ", model)

# Loss and Optimizer
criterion = nn.CrossEntropyLoss() # Cross-entropy loss
optimizer = torch.optim.Adam(model.parameters()) # Example optimizer

# Train the model
train_dataset = create_embedding_training_set(X_train, y_train)
print("Size of training set: ", len(train_dataset))
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)

num_epochs = 300
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}")
    for i, (parts, neighbors) in enumerate(train_loader):
        optimizer.zero_grad()
        parts = parts.to(device)
        neighbors = neighbors.to(device)
        output = model(parts)
        loss = criterion(output, neighbors)
        loss.backward()
        optimizer.step()

# Extract the embeddings from the model and save them in pytorch format
embeddings = model.embeddings.weight.data
detached_embeddings = embeddings.detach().cpu()
print(len(embeddings))
print(embeddings)
torch.save(detached_embeddings, './models/embeddings.pt')



In [None]:
# Example code to load the embeddings in a new environment
loaded_weights = torch.load('./models/embeddings.pt')

# If you're using a specific device in your new model/environment, move the weights to that device
# For example, if you're using MPS in your new setup
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
loaded_weights = loaded_weights.to(device)
print("Example embedding for one part: ", loaded_weights[-1]) 
print("Embedding dimensions: ", len(loaded_weights[0]))
print("Vocabulary size :", len(loaded_weights))

In [None]:
# check equivalence implementation
part1 = Part(202, 2)
part2 = Part(202, 2)
part3 = Part(203, 3)
print(f"Part1: id={id(part1)}, hash={hash(part1)}")
print(f"Part2: id={id(part2)}, hash={hash(part2)}")

print(part1.equivalent(part2))

print(part1.get_part_id() == part2.get_part_id())  
print(part1.get_family_id() == part2.get_family_id()) 
print(Part(202, 2) == Part(202, 2)) 
print(Part(202, 2).__eq__(Part(202, 2)))#
print(part1 == part2)
print(part1 == part3)