<a href="https://colab.research.google.com/github/valeriya-khan/image-captioning/blob/main/similarity_word2vec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gensim
!pip install spacy
!python -m spacy download en_core_web_lg

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import os
import glob

from gensim.parsing.preprocessing import strip_punctuation
import gensim.downloader as api
import gensim
import spacy
import nltk
nltk.download("punkt")
nltk.download('stopwords')
from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize 

stop_words = set(stopwords.words('english')) 
skipped_input = []
skipped_caption=[]

In [None]:
path = "/content/drive/Team Drives/ROCS (HACKNU)/word2vec/GoogleNews-vectors-negative300.bin"
model = gensim.models.KeyedVectors.load_word2vec_format(path, binary=True)

In [None]:
# preprocessing
import re
import unicodedata
import inflect

# must-have
import scipy
import numpy as np

# image classification model
from keras.applications.inception_resnet_v2 import InceptionResNetV2
from keras.preprocessing import image
from keras.applications.inception_resnet_v2 import preprocess_input, decode_predictions

In [None]:
# load pretrained spaCy
nlp = spacy.load("en_core_web_lg")

In [None]:
def remove_non_ascii(words):
  """Remove non-ASCII characters from list of tokenized words"""
  new_words = []
  for word in words:
    new_word = unicodedata.normalize('NFKD', word).encode('ascii', 'ignore').decode('utf-8', 'ignore')
    new_words.append(new_word)
  return new_words


def to_lowercase(words):
  """Convert all characters to lowercase from list of tokenized words"""
  new_words = []
  for word in words:
    new_word = word.lower()
    new_words.append(new_word)
  return new_words


def remove_punctuation(words):
  """Remove punctuation from list of tokenized words"""
  new_words = []
  for word in words:
    new_word = re.sub(r'[^\w\s]', '', word)
    if new_word != '':
      new_words.append(new_word)
  return new_words


def replace_numbers(words):
  """Replace all interger occurrences in list of tokenized words with textual representation"""
  p = inflect.engine()
  new_words = []
  for word in words:
    if word.isdigit():
      new_word = p.number_to_words(word)
      new_words.append(new_word)
    else:
      new_words.append(word)
  return new_words


def remove_stopwords(words):
  """Remove stop words from list of tokenized words"""
  new_words = []
  for word in words:
    if word not in nltk.corpus.stopwords.words('english'):
      new_words.append(word)
  return new_words


def stem_words(words):
  """Stem words in list of tokenized words"""
  stemmer = LancasterStemmer()
  stems = []
  for word in words:
    stem = stemmer.stem(word)
    stems.append(stem)
  return stems


def lemmatize_verbs(words):
  """Lemmatize verbs in list of tokenized words"""
  lemmatizer = WordNetLemmatizer()
  lemmas = []
  for word in words:
    lemma = lemmatizer.lemmatize(word, pos='v')
    lemmas.append(lemma)
  return lemmas


def normalize(words):
  """Normalize words using methods above"""
  words = remove_non_ascii(words)
  words = to_lowercase(words)
  words = remove_punctuation(words)
  words = replace_numbers(words)
  words = remove_stopwords(words)
  return words

In [None]:
clf_model = InceptionResNetV2(include_top=True, weights='imagenet', input_tensor=None, input_shape=None, pooling=None, classes=1000)

In [None]:
def get_clf_similarities(test_num, test_path, top=1):
  # RENAME THIS AS NEEDED
  # test_path = "/content/drive/Team Drives/ROCS (HACKNU)/test-updated"
  
  """Calculate similarities between query and top-1 classification result"""
  #print(f"----- Calculating similarity using image classification -----")
  
  # read query
  with open(os.path.join(test_path, str(test_num), "input"), "r") as file:
    # read file
    raw_query = file.read()
    # tokenize
    tokens = nltk.word_tokenize(raw_query)
    # normalization
    query = normalize(tokens)
  #  print(f"Query: {query}")

  # calculate similarities for image classification
  similarities = [0.0]*6
  
  # iterating over all images in test case
  for img_path in glob.glob(os.path.join(test_path, str(test_num), "*.jpg")):
    candidate_img = img_path.split("/")[-1][:-4]
    # print(f"Candidate image: {candidate_img}")
    
    # read image from defined path
    img = image.load_img(img_path, target_size=(299, 299))
    
    # convert image to np array and preprocess
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    # predict image label(s)
    preds = clf_model.predict(x)
    # decode the results into a list of tuples (class, description, probability)
    decoded = decode_predictions(preds, top=top)[0]
    # print(f"Predicted: {decoded}")
    
    img_sim = 0
    # iterating over all words in current query
    for qword in query:
      # print(f"\tQuery word: {qword}")
      pred = decoded[0][1]
      # lower and split by "_"
      pred = pred.lower()
      pred = pred.split("_")
      # print(f"\tClassification prediction(s): {pred}")
      
      for pword in pred:
        sim = nlp(qword).similarity(nlp(pword))
        # add to candidate similarity
        img_sim += sim
        # print(f"\t\tSimilarity of '{qword}' with '{pword}': {sim}")
    qr = 0
    for u in query:
      qr = qr + 1
    pr = 0
    for v in pred:
      pr = pr + 1
    #print(pred)
    #print(query)
    #qr = len(query)
    #print(qr)
    
    #pr = len(pred)
    img_sim = img_sim/pr
    img_sim = img_sim/qr
    # print(f"\tCandidate prediction similarity (after dividing by {len(pred)*len(query)}): {img_sim}")
    # print("\t------------------------------")
    similarities[int(candidate_img)-1] = img_sim
    
 # print(f"Similarities for query: {similarities}")
  best = np.argmax(similarities)+1
  #print(f"Best match: {best}")
  #print(f"--------------------------------------------------")
  return best

## Edit this

In [None]:
captions = [line.rstrip('\n') for line in open('/content/drive/Team Drives/ROCS (HACKNU)/predicted_captions.txt') if line.strip()]
inputs = [line.rstrip('\n') for line in open('/content/drive/Team Drives/ROCS (HACKNU)/inputs.txt') if line.strip()]
captions1 = [line.rstrip('\n') for line in open('/content/drive/Team Drives/ROCS (HACKNU)/predicted_captions_2nd-ver.txt') if line.strip()]
testdir='drive/Team Drives/ROCS (HACKNU)/Archive2/'
outfile = open('drive/Team Drives/ROCS (HACKNU)/rocs_outfile.txt', "w")

In [None]:
def writeOutput(test_dir,i,numb):
    with open(test_dir  + str(i)+'/output', 'w') as f:
           f.write("%i" % numb)

In [None]:
def prob_calc(i,captions):
  similarity=[]
  for j in range(0,6):
    captions_sent = strip_punctuation(captions[6*i+j]).lower().split()
    filtered_captions = [w for w in captions_sent if not w in stop_words]
    skipped_caption.append([x for x in filtered_captions if x not in model.vocab])
    cap_words = [x for x in filtered_captions if x in model.vocab]
    sim = model.n_similarity(cap_words, input_words)
    similarity.append(sim)
  prob=sorted(similarity[:],reverse=True)[0]-sorted(similarity[:],reverse=True)[1]
  return prob, similarity

In [None]:
MAX_DIR=70
listik= []
for i in range(0,MAX_DIR):
  input_sent = strip_punctuation(inputs[i]).lower().split()
  filtered_inputs = [w for w in input_sent if not w in stop_words]
  input_words =[x for x in filtered_inputs if x in model.vocab]
  skipped_input.append([x for x in filtered_inputs if x not in model.vocab])
  similarity=[]
#   for j in range(0,6):
#     captions_sent = strip_punctuation(captions[6*i+j]).lower().split()
#     filtered_captions = [w for w in captions_sent if not w in stop_words]
#     skipped_caption.append([x for x in filtered_captions if x not in model.vocab])
#     cap_words = [x for x in filtered_captions if x in model.vocab]
#     sim = model.n_similarity(cap_words, input_words)
#     similarity.append(sim)
    
#   k = similarity.index(max(similarity))+1
#   f.write("%i %i\n\n" %(k,(i+1)))
#   prob = sorted(similarity[:],reverse=True)[0]-sorted(similarity[:],reverse=True)[1]
  prob,similarity=prob_calc(i,captions)
  
  if prob>0.06:
#     [f.write("{:.4f}".format(word)+" ") for word in similarity]
#     f.write("/n")
    k=similarity.index(max(similarity))+1
    listik.append(k)
    #outfile.write(str(k)+"\n")
    print(similarity.index(max(similarity))+1,i+1)
    writeOutput(testdir,i+1,similarity.index(max(similarity))+1)
  else:
    similarity=[]
#     for j in range(0,6):
#       captions_sent = strip_punctuation(captions1[6*i+j]).lower().split()
#       filtered_captions = [w for w in captions_sent if not w in stop_words]
#       skipped_caption.append([x for x in filtered_captions if x not in model.vocab])
#       cap_words = [x for x in filtered_captions if x in model.vocab]
#       sim = model.n_similarity(cap_words, input_words)
#       similarity.append(sim)
#     prob = sorted(similarity[:],reverse=True)[0]-sorted(similarity[:],reverse=True)[1]
    prob,similarity=prob_calc(i,captions1)
    if prob>0.06:
#       [f.write("{:.4f}".format(word)+" ") for word in similarity]
#       f.write("/n")
      k=similarity.index(max(similarity))+1
      listik.append(k)
      #outfile.write(str(k)+"\n")
      print(similarity.index(max(similarity))+1,i+1)
      writeOutput(testdir,i+1,similarity.index(max(similarity))+1)
    else:
      ind=get_clf_similarities(i+1,testdir)
      writeOutput(testdir,i+1,ind)
      #outfile.write(str(ind)+"\n")
      listik.append(ind)
      print(ind,i+1)
# f.close()


# filtered_captions = [w for w in sentence_obama if not w in stop_words] 
# filtered_inputs = [w for w in sentence_president if not w in stop_words] 

# print(sentence_obama) 
# print(filtered_sentence1) 

In [None]:
lenik= 0

for up in listik:
  lenik = lenik + 1

print(f"len: {lenik}")
print(f"list: {listik}")

In [None]:
with open("drive/Team Drives/ROCS (HACKNU)/rocs_outfile.txt", "w") as outfile:
  for l in listik:
    outfile.write(str(l)+"\n")