In [4]:
from tqdm.auto import tqdm
import networkx as nx
import json

In [13]:

ROOT_TAG = '<MODEL>'
CLS_TAG = '<CLS>'
CLS_NAME_TAG = '<NAME>'
ATTRS_TAG = '<ATTRS>'
ASSOCS_TAG = '<ASSOCS>'
OPEN_CHAR = '('
CLOSE_CHAR = ')'
special_tags = [ROOT_TAG, CLS_TAG, CLS_NAME_TAG, ATTRS_TAG, ASSOCS_TAG, OPEN_CHAR, CLOSE_CHAR]

ecore_keys = ['ecore:EPackage', 'ecore:EClass', 'ecore:EEnum', 'EPackage']

ecore_mapping = {
    'ecore:EClass': 'eStructuralFeatures',
    'ecore:EEnum': 'eLiterals',
}

OTHER_WITHIN_CLASSES = "other classes within package"
OTHER_ACROSS_CLASSES = "other classes across packages"
OTHER_PACKAGES = "other packages"


TRAIN_MODELS = 'WeyssowPMC/train/repo_ecore_all.txt'
TEST_MODELS_ITERATIVE = 'WeyssowPMC/test/test_iterative_construction.txt'
TEST_MODELS_FULL_CONTEXT = 'WeyssowPMC/test/test_probing_full_context.txt'
TEST_MODELS_LOCAL = 'WeyssowPMC/test/test_probing_local_context.txt'



In [24]:

"""
 s = "( <MODEL> ( <CLS> ( <NAME> GeneralizableElement ) ) ( <CLS> ( <NAME> <mask> ) ) )"
"""

def get_model_from_tree_text(text):
    current_special = None
    i = 0
    classes = dict()
    l = text.split()
    while i < len(l):
        current_token = l[i]
        
        if current_token == CLS_NAME_TAG:
            current_special = CLS_TAG
            current_class = l[i+1]
            classes[current_class] = {'assocs': list(), 'attrs': list()}
            i += 1
        elif current_token == ASSOCS_TAG:
            current_special = ASSOCS_TAG
        elif current_token == ATTRS_TAG:
            current_special = ATTRS_TAG
        elif current_token not in special_tags:
            x, y = current_token, l[i+1]
            if current_special == ASSOCS_TAG:
                classes[current_class]['assocs'].append((x, y))
            else:
                classes[current_class]['attrs'].append((x, y))
            i += 1    
        i += 1
    return classes


def model_to_graph(graph, model_dict):
    for cls in model_dict:
        attrs = model_dict[cls]['attrs']
        if cls not in graph:
            graph.add_node(cls, type='class')
        graph.nodes[cls]['attributes'] = [x for x in attrs]

    for cls in model_dict:
        assocs = model_dict[cls]['assocs']
        for assoc in assocs:
            assoc_cls, assoc_name = assoc
            if not graph.has_edge(cls, assoc_cls):
                graph.add_edge(cls, assoc_cls, name=assoc_name, type='association')

s = "( <MODEL> ( <CLS> ( <NAME> GeneralizableElement ) ) ( <CLS> ( <NAME> <mask> ) ) )"
graph = nx.DiGraph()
model_to_graph(graph, get_model_from_tree_text(s))

In [16]:
def generate_consolidated_graph(texts):
    graph = nx.DiGraph()
    for text in tqdm(texts):
        model_dict = get_model_from_tree_text(text)
        model_to_graph(graph, model_dict)
    return graph
        

In [5]:
train_texts = open(TRAIN_MODELS).read().split('\n')
train_graph = generate_consolidated_graph(train_texts)
print(train_graph.number_of_nodes(), train_graph.number_of_edges())
nx.write_gpickle(train_graph, 'pmc_data/pmc_train_graph.gpickle')

  0%|          | 0/11113 [00:00<?, ?it/s]

23327 49879


In [6]:
train_graph.number_of_nodes(), train_graph.number_of_edges()

(23327, 49879)

In [7]:
test_text_full_context = open(TEST_MODELS_FULL_CONTEXT).read().split('\n')

test_context_local = [(";".join(i.split(';')[:-1]), i.split(';')[-1]) for i in open(TEST_MODELS_LOCAL).read().split('\n')]

In [8]:
test_iterative_construction = list()
for i in open(TEST_MODELS_ITERATIVE).read().split('\n'):
    iterative_model = list()
    if len(i) > 0:
        model_str, iteration, element_type, element_name = i.split(';')
        iterative_model.append((model_str, iteration, element_type, element_name))
    test_iterative_construction.append(iterative_model)

In [15]:
def model_text_to_graph(model_text):
    model_dict = get_model_from_tree_text(model_text)
    graph = nx.DiGraph()
    model_to_graph(graph, model_dict)
    return graph

In [1]:
import random

MASK = "<mask>"

def create_model_text_to_string(model_text, mask=-1):
    graph = model_text_to_graph(model_text)
    return create_graph_to_string(graph, mask)


def create_graph_to_string(graph, mask=-1, mask_type='class'):
    node2str = dict()
    for node in graph.nodes:
        node_str = f"{node}"
        if 'attributes' in graph.nodes[node]:
            attrs_str = ("attributes=" + ", ".join([f"{attr[1]}: {attr[0]}" for attr in graph.nodes[node]['attributes']]) if len(graph.nodes[node]['attributes']) else "")
        else:
            attrs_str = ""
        # print(node_str)
        node2str[node] = node_str + '(' + attrs_str + ')'
    
    if isinstance(mask, int) and mask > 0:
        try:
            masked_nodes = random.sample(list(graph.nodes), mask)
            for node in masked_nodes:
                node2str[node] = MASK
        except ValueError:
            raise ValueError(f"Mask value {mask} is greater than number of nodes {len(graph.nodes)}")

    elif isinstance(mask, float) and mask > 0:
        mask = max(int(mask * len(graph.nodes)), 1)
        try:
            masked_nodes = random.sample(list(graph.nodes), mask)
            for node in masked_nodes:
                node2str[node] = MASK
        except ValueError:
            raise ValueError(f"Mask value {mask} is greater than number of nodes {len(graph.nodes)}")
        
    node_rep = lambda n: f"Class {n if node2str[n] != MASK else MASK}"
    edge_rep = lambda edge, graph: {graph.edges[edge]['name'] if (node2str[edge[0]] != MASK and node2str[edge[1]] != MASK) else MASK}
        
    node_strs = "\n".join([f"Class {node2str[node]}" for node in graph.nodes])
    edge_strs = "\n".join([f"{node_rep(edge[0])} association {edge_rep(edge, graph)} association Class {node_rep(edge[1])}" for edge in graph.edges])
    if mask <= 0:
        return "Nodes: \n" + node_strs + '\nRelations: \n' + edge_strs
    return "Nodes: \n" + node_strs + '\nRelations: \n' + edge_strs, masked_nodes


In [None]:
print(create_model_text_to_string(train_texts[89], mask=0.6))

In [None]:
texts = list()
num_samples = 20
for train_text in tqdm(train_texts):
    model_str = create_model_text_to_string(train_text)
    texts.append(model_str)

with open('pmc_data/pmc_train_text.json', 'w') as f:
    json.dump(texts, f, indent=4)

In [None]:
mask_values = [1, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
for mask_value in mask_values:
    texts = list()
    for test_text in tqdm(test_text_full_context):
        if test_text.strip() == "":
            continue
        model_str, masked_nodes = create_model_text_to_string(test_text, mask=mask_value)
        texts.append((model_str, masked_nodes))
    with open(f"pmc_data/pmc_test_full_context_mask_{mask_value}.json", 'w') as f:
        json.dump(texts, f, indent=4)

In [None]:
iterative_construction_models = list()
for iterative_model in tqdm(test_iterative_construction):

    if len(iterative_model) == 0:
        continue
    iterative_texts = list()
    for iterative_text in iterative_model:
        model, iteration, element_type, element_name = iterative_text
        model_str = create_model_text_to_string(model, mask=element_name)
        iterative_texts.append((model_str, iteration, element_type, element_name))
    iterative_construction_models.append(iterative_texts)

with open(f"pmc_data/pmc_test_iterative_construction.json", 'w') as f:
    json.dump(iterative_construction_models, f, indent=4)

In [51]:
s = "( <MODEL> ( <CLS> ( <NAME> MEDiagram ) ( <ATTRS> ( EString diagramLayout ) ) ( <ASSOCS> ( Annotation annotations ) ( Attachment attachments ) ( LeafSection incomingDocumentReferences ) ( LeafSection leafSection ) ( StereotypeInstance appliedStereotypeInstances ) ( Comment comments ) ( UnicaseModelElement referringModelElements ) ( UnicaseModelElement elements ) ( UnicaseModelElement newElements ) ) ) ( <CLS> ( <NAME> ClassDiagram ) ( <ASSOCS> ( Annotation annotations ) ( Attachment attachments ) ( LeafSection incomingDocumentReferences ) ( LeafSection leafSection ) ( StereotypeInstance appliedStereotypeInstances ) ( Comment comments ) ( UnicaseModelElement referringModelElements ) ( UnicaseModelElement elements ) ( UnicaseModelElement newElements ) ) ) ( <CLS> ( <NAME> UseCaseDiagram ) ( <ASSOCS> ( Annotation annotations ) ( Attachment attachments ) ( LeafSection incomingDocumentReferences ) ( LeafSection leafSection ) ( StereotypeInstance appliedStereotypeInstances ) ( Comment comments ) ( UnicaseModelElement referringModelElements ) ( UnicaseModelElement elements ) ( UnicaseModelElement newElements ) ) ) ( <CLS> ( <NAME> ComponentDiagram ) ( <ASSOCS> ( Annotation annotations ) ( Attachment attachments ) ( LeafSection incomingDocumentReferences ) ( LeafSection leafSection ) ( StereotypeInstance appliedStereotypeInstances ) ( Comment comments ) ( UnicaseModelElement referringModelElements ) ( UnicaseModelElement elements ) ( UnicaseModelElement newElements ) ) ) ( <CLS> ( <NAME> StateDiagram ) ( <ASSOCS> ( Annotation annotations ) ( Attachment attachments ) ( LeafSection incomingDocumentReferences ) ( LeafSection leafSection ) ( StereotypeInstance appliedStereotypeInstances ) ( Comment comments ) ( UnicaseModelElement referringModelElements ) ( UnicaseModelElement elements ) ( UnicaseModelElement newElements ) ) ) ( <CLS> ( <NAME> ActivityDiagram ) ( <ASSOCS> ( Annotation annotations ) ( Attachment attachments ) ( LeafSection incomingDocumentReferences ) ( LeafSection leafSection ) ( StereotypeInstance appliedStereotypeInstances ) ( Comment comments ) ( UnicaseModelElement referringModelElements ) ( UnicaseModelElement elements ) ( UnicaseModelElement newElements ) ) ) ( <CLS> ( <NAME> WorkItemDiagram ) ( <ASSOCS> ( Annotation annotations ) ( Attachment attachments ) ( LeafSection incomingDocumentReferences ) ( LeafSection leafSection ) ( StereotypeInstance appliedStereotypeInstances ) ( Comment comments ) ( UnicaseModelElement referringModelElements ) ( UnicaseModelElement elements ) ( UnicaseModelElement newElements ) ) ) )"

In [5]:
train_graph = nx.read_gpickle('pmc_data/pmc_train_graph.gpickle')

In [54]:
def get_node_map(graph):
    node2str = dict()
    for node in graph.nodes:
        node_str = f"{node}"
        if 'attributes' in graph.nodes[node]:
            attrs_str = ("attributes=" + ", ".join([f"{attr[1]}: {attr[0]}" for attr in graph.nodes[node]['attributes']]) if len(graph.nodes[node]['attributes']) else "")
        else:
            attrs_str = ""
        # print(node_str)
        node2str[node] = node_str + '(' + attrs_str + ')'
    return node2str


def get_attribute_docs(graph, node2str, with_classes_context=False, with_attributes_context=False):
    node_name = lambda node: f"Class {node2str[node]}" if with_attributes_context and node in node2str else f"Class {node}"
    attribute_docs = list()
    for node in graph.nodes:
        if 'attributes' in graph.nodes[node] and len(graph.nodes[node]['attributes']) > 0:
            context = ", ".join([f"{node_name(neighbor)}" for neighbor in graph.neighbors(node)]) if with_classes_context else ""
            node_str = f"{node + (' ' + context if with_classes_context else '')}"
            for x in graph.nodes[node]['attributes']:
                attribute_docs.append((node_str, x[1]))

    return attribute_docs


def get_association_docs(graph, node2str, with_classes_context=False, with_attributes_context=False):
    node_name = lambda node: f"Class {node2str[node]}" if with_attributes_context and node in node2str else f"Class {node}"

    association_docs = list()
    for edge in graph.edges:
        c1, c2 = edge
        association_name, _ = graph.edges[edge]['name'], graph.edges[edge]['type']
        context = ", ".join([f"{node_name(neighbor)}" for neighbor in graph.neighbors(edge[0])]) if with_classes_context else ""
        
        association_docs.append((f"{node_name(c1) + (' ' + context + ' ' if with_classes_context else ' ') + node_name(c2)}", association_name))
        
    return association_docs


def get_class_docs(graph, node2str, with_attributes_context=False):
    node_name = lambda node: f"Class {node2str[node]}" if with_attributes_context and node in node2str else f"Class {node}"
    class_docs = list()
    for node in graph.nodes:
        context = ", ".join([f"{node_name(neighbor)}" for neighbor in graph.neighbors(node)])
        if context == "":
            context = ", ".join([f"{node_name(neighbor)}" for neighbor in graph.nodes if neighbor != node])

        class_docs.append((f"{context}", node))
    return class_docs

In [55]:
import itertools

def create_dataset(texts, split='train'):
    attribute_context = [0, 1]
    class_context = [0, 1]

    configs = [c for c in itertools.product(attribute_context, class_context)]

    configs_dataset = {c: {'class': list(), 'associations': list(), 'attributes': list()} for c in configs}

    
    for train_text in tqdm(texts):
        if train_text.strip() == "":
                continue
        graph = model_text_to_graph(train_text)
        node_str = get_node_map(train_graph)
        
        for config in configs:
            attribute_context, class_context = config
            attribute_docs = get_attribute_docs(graph, node_str, with_classes_context=class_context, with_attributes_context=attribute_context)
            association_docs = get_association_docs(graph, node_str, with_classes_context=class_context, with_attributes_context=attribute_context)
            class_docs = get_class_docs(graph, node_str, with_attributes_context=attribute_context)
            configs_dataset[config]['class'].extend(class_docs)
            configs_dataset[config]['associations'].extend(association_docs)
            configs_dataset[config]['attributes'].extend(attribute_docs)

    for config in configs_dataset:
        config_str = f"attribute_context={config[0]}_class_context={config[1]}"
        with open(f"pmc_data/pmc_{split}_{config_str}.json", 'w') as f:
            json.dump(configs_dataset[config], f, indent=4)
    return configs_dataset

In [56]:
train_texts = open(TRAIN_MODELS).read().strip().split('\n')
train_configs_dataset = create_dataset(train_texts)

  0%|          | 0/11112 [00:00<?, ?it/s]

In [57]:
# train_texts = open(TRAIN_MODELS).read().split('\n')
test_text_full_context = open(TEST_MODELS_FULL_CONTEXT).read().strip().split('\n')
test_configs_dataset = create_dataset(test_text_full_context, split='test_full')

  0%|          | 0/166 [00:00<?, ?it/s]

In [47]:
attribute_context = [0, 1]
class_context = [0, 1]

configs = [c for c in itertools.product(attribute_context, class_context)]
train_configs_dataset = {c: json.load(open(f"pmc_data/pmc_train_attribute_context={c[0]}_class_context={c[1]}.json")) for c in configs}

In [60]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
import numpy as np


def tf_idf_recommender(train_data, test_data, k=10):

    train_x, train_y = [x[0] for x in train_data], [x[1] for x in train_data]
    test_x, test_y = [x[0] for x in test_data], [x[1] for x in test_data]

    classifier = Pipeline([
        ('tfidf', TfidfVectorizer()),
        ('svm', SVC(kernel='linear'))
    ])

    classifier.fit(train_x, train_y)
    probabilities = classifier.predict_proba(test_x)

    topk_predictions = [np.argsort(-probs)[:k] for probs in probabilities]

    
    recall_k = [1 if y_act in y_pred else 0 for y_act, y_pred in zip(test_y, topk_predictions)]
    recall = np.sum(recall_k)/len(recall_k)

        
    print(f"Recall: {recall}")
    

In [62]:
def remove_duplicates(list_of_lists):
    # print("Before", len(list_of_lists))
    reverse_map = {str(i): i for i in list_of_lists}
    # print("After", len(reverse_map))
    return list(reverse_map.values())

def get_dataset(configs_dataset, config):
    config_data = configs_dataset[config]
    class_docs = remove_duplicates(config_data['class'])
    association_docs = remove_duplicates(config_data['associations'])
    attribute_docs = remove_duplicates(config_data['attributes'])
    print(len(class_docs), len(association_docs), len(attribute_docs))
    return {
        'class': class_docs,
        'associations': association_docs,
        'attributes': attribute_docs
    }

for config in configs:
    train_data = get_dataset(train_configs_dataset, config)
    test_data = get_dataset(test_configs_dataset, config)
    config_str = f"attribute_context={config[0]}_class_context={config[1]}"
    print("Config: ", config_str)
    for key in train_data:
        print("Classification: ", key)
        tf_idf_recommender(train_data[key], test_data[key])
    break

52488 54158 44261
1582 2669 1150
Config:  attribute_context=0_class_context=0
Classification:  class
