## Recommendation System

This model uses a recommendation system to predict the 'terms' which can be used for a given proof based on the list of 'types' used in its statement.

In [2]:
# importing required libraries

import json
import yaml
import random
import pandas as pd
from statistics import mean
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer

In [4]:
yaml_file = open(r"shallow_deps/split15.yaml", "r", encoding='utf-8')
yaml_content = yaml.safe_load(yaml_file)

In [2]:
file = open(r"shallow_deps/def_type.json", "r", encoding='utf-8')
js = file.read()
def_type = json.loads(js)
file.close()

In [3]:
no_test = 50
tests = []
random.shuffle(def_type['defn'])
random.shuffle(def_type['types'])

In [7]:
df = pd.DataFrame(0, index=def_type['defn'], columns=def_type['types'])

In [None]:
for i in range(16):
    yaml_file = open(f"shallow_deps/split{i}.yaml", "r", encoding='utf-8')
    yaml_content = yaml.safe_load(yaml_file)

    train, test = train_test_split(yaml_content, test_size = (no_test/len(yaml_content)))
    tests.extend(test)

    for val in train:
        defn1 = [str(val) for val in val["defn"]]
        types1 = [str(val) for val in val["type"]]
        df.loc[defn1, types1] += 1
        
    print("Done :", i)

In [18]:
for i in range(31):
    yaml_file = open(f"shallow_deps/split{i}.yaml", "r", encoding='utf-8')
    yaml_content = yaml.safe_load(yaml_file)
    yaml_file.close()

    for val in yaml_content:
        val["defn"] = [str(val) for val in val["defn"]]
        val["type"] = [str(val) for val in val["type"]]
        
    json_file = open(f"shallow_deps/split{i}.json", "w", encoding='utf-8')
    json.dump(yaml_content, json_file, indent=2)
    json_file.close()
        
    print("Done :", i)

Done : 0
Done : 1
Done : 2
Done : 3
Done : 4
Done : 5
Done : 6
Done : 7
Done : 8
Done : 9
Done : 10
Done : 11
Done : 12
Done : 13
Done : 14
Done : 15
Done : 16
Done : 17
Done : 18
Done : 19
Done : 20
Done : 21
Done : 22
Done : 23
Done : 24
Done : 25
Done : 26
Done : 27
Done : 28
Done : 29
Done : 30


In [10]:
b = [{'name' : 5},{'name' : 6}]
for a in b:
    a['name'] = 3
    
b

[{'name': 3}, {'name': 3}]

In [None]:
df_corr = df.corr(method='spearman')

In [None]:
for i in range(16):
    yaml_file = open(f"shallow_deps/split{i}.yaml", "r", encoding='utf-8')
    yaml_content = yaml.safe_load(yaml_file)

    for j in range(len(yaml_content)):
        defn = defn.union(set(yaml_content[j]['defn']))
        types = types.union(set(yaml_content[j]['type']))
        
    print('Done :' + str(i))

In [None]:
data = {"defn": defn1,
        "types": types1}

In [None]:
with open("shallow_deps/def_type.json", 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=4)

In [None]:
defn1 = [str(i) for i in defn]
types1 = [str(i) for i in types]

In [None]:
defn1[0]

In [13]:
file = open(r"shallow_deps/write.yaml", "r", encoding='utf-8')
vals = file.read()
file.close()

In [14]:
import re
index = [m.start() for m in re.finditer('- name:', vals)]

In [15]:
last = 0
new = 1
name = 0

for i in range(1, len(index)):
    
    if i%5000 == 0 or i == len(index)-1:
        file = open(f"shallow_deps/split{name}.yaml", "w", encoding='utf-8')
        file.write(vals[index[last]:index[i]])
        file.close()
        last = i
        name += 1

In [1]:
150423/25

6016.92

In [None]:
# Get all the proofs
proof_types = []
for proof in yaml_content:
    string = ''
    for term in proof['defn']:
        string += (str(term)+' ')
    proof_types.append(string[0:-1])

In [None]:
tokenizer = Tokenizer(filters='', lower=False)
tokenizer.fit_on_texts(proof_types)
word_index = tokenizer.word_index
total_unique_words = len(tokenizer.word_index) + 1 
print(total_unique_words)
print(word_index)

In [None]:
for i in range(len(yaml_content)):
    yaml_content[i]['defn'] = tokenizer.texts_to_sequences([yaml_content[i]['defn']])[0]

In [None]:
yaml_content[0]

In [None]:
import re
updated = re.sub(r'^PANIC.*\n?', '', yaml_content, flags=re.MULTILINE)

file = open(r"shallow_deps/write.yaml", "w", encoding='utf-8')
file.write(updated)
file.close()

In [None]:
print(yaml_content[0:100])

In [None]:
# Open and Read JSON file's data

file = open(r"shallow_deps/shallow_deps.yaml", "r", encoding='utf-8')
js = file.read()
data = json.loads(js)
file.close()

In [None]:
# Get all the proofs
proofs = data["triples"]

# Divide proofs into train & test Data 
no_test = 250    # No of elements in test data
train, test = train_test_split(proofs, test_size = (no_test/len(proofs)))

In [None]:
# Shuffle the list of all types and terms to avoid any biasedness peresent in data

statements = [row["name"] for row in data["types"]]
random.shuffle(statements)
proofs = [row["name"] for row in data["terms"]]
random.shuffle(proofs)

In [None]:
# Create a data frame with index beeing the list of terms & columns being the list of Types

df = pd.DataFrame(index = proofs)

for statement in statements:
    df[statement] = [0]*len(df)

In [None]:
# Training on the 'train' data
for val in train:
    df.loc[val["terms"], val["types"]] += 1
    
# Get the Correlation dataframe (to be used while getting recommendations)
df_corr = df.corr(method='spearman')

In [None]:
plt.matshow(df.values)

In [None]:
# Function to return list of all 'terms' based on their decreasing order of priorities for a given list of 'types'

def get_recom(statement):
    corr = df_corr[statement]
    corr = corr.mean(axis=1)
    wght = df * corr

    wght["mean"] = wght.mean(axis=1)
    final = wght[["mean"]]
    final = final.sort_values(by=["mean"], ascending=False)

    return final.index.tolist()

In [None]:
# function to get score (average rank of terms in the recommended list)

def get_score(triple, single = True):
    if triple["terms"] == [] or triple["types"] == []:
        return "terms_or_types_is_empty"
    
    else:
        recom = get_recom(triple["types"])

        pos_recom = []
        pos_norm = []

        for term in triple["terms"]:
            pos_recom.append(recom.index(term))
            pos_norm.append(df.index.to_list().index(term))

        avg_recom = mean(pos_recom)
        avg_norm = mean(pos_norm)
        
        if single:
            return avg_recom
        else:
            return (avg_recom, avg_norm)

In [None]:
# function to calculate the average score for more than one test data

def avg_score(triples, graph = False):
    print(f"Calculating average score for {len(triples)} elements.")
    print("element\t(avg_recom, avg_norm)")
    
    i = 0
    list_recom = []
    list_index = []
    
    for triple in triples:
        score = get_score(triple, False)
        if score != "terms_or_types_is_empty":
            list_recom.append(score)
            list_index.append(score[0])
        i+=1
        #print(f" {i} : {score}")
    
    avg_per = mean(list_index)
    print(f" Average rank of terms present in the proof of {len(list_recom)} elements is {round(avg_per)} out of {len(data['terms'])} total terms.")
    
    return avg_per

In [None]:
avg_score(test)

Here the average score (x) represents that it only requires x% of the time (ordering) when using the recommended list to find the terms used in the proof.
i.e. the average score is 100% if the 'terms' for a proof would have been found using normal 'terms' list.

In [None]:
# function to calculate the average score for more than one test data (used for plotting graph)

def avg_score_graph(triples):
    
    list_recom = []
    list_norm = []
    
    for triple in triples:
        score = get_score(triple, False)
        if score != "terms_or_types_is_empty":
            list_recom.append(score[0])
            list_norm.append(score[1])
    
    return list_recom, list_norm

In [None]:
scores_recom, scores_norm = avg_score_graph(test)
test_no = list(range(1,len(scores_norm)+1))

plt.figure(figsize=[15,15])
plt.plot(test_no, scores_recom, 'g*', label="Recommended_index")
plt.axhline(y = mean(scores_recom), color = 'g', ls = 'dotted', label="Avg_recommended_index")

plt.plot(test_no, scores_norm, 'r+', label="Normal_index")
plt.axhline(y = mean(scores_norm), color = 'r', ls = 'dotted', label="Avg_normal_index")

plt.axhline(y = 12, color = 'b', ls = 'dotted', label="Target_index")

plt.legend(loc="upper left")

plt.xlabel('Proofs')
plt.ylabel('Index of Terms')

plt.title('Index Plot')

In [None]:
def table_gen(triple):
    if triple["terms"] == [] or triple["types"] == []:
        return "terms_or_types_is_empty"
    
    else:
        recom = get_recom(triple["types"])

        pos_recom = []
        for term in triple["terms"]:
            pos_recom.append(recom.index(term))

        max_index = max(pos_recom)
        mean_index = mean(pos_recom)

        return (triple["name"], triple["types"], triple["terms"], recom[:max_index+1], pos_recom, round(mean_index,2), max_index)

In [None]:
def table_gen_top(triples, top):
    list_rows = []
    
    for triple in triples:
        values = table_gen(triple)
        if values != "terms_or_types_is_empty":
            list_rows.append(values)
    
    table_df = pd.DataFrame(list_rows, columns =['name', 'Types', 'Actual_terms', 'Top_pred_terms', 'Pos_of_actual_term_in_pred (out_of 3337 terms)', 'Avg_pred_index', 'Max_pred_index'])
    table_df = table_df.set_index('name')
    table_df = table_df.sort_values('Avg_pred_index')
    
    table_df = table_df.head(top)
    
    # Generate csv and json file for pretty view
    table_df.to_csv('top_pred.csv')
    table_df.to_json('top_pred.json', orient='index', indent=3)
    
    return table_df

In [None]:
# Generating table for best 20 (can be changed by changing top value in fun) prediction

display(table_gen_top(test, top = 20))