In [35]:
start_token = "<|startoftext|>"
end_token = "<|endoftext|>"
line_break_token = "<|line_break|>"
verse_break_token = "<|verse_break|>"

def strip_tokens(input_text, clean_text):
  with open(input_text, "r") as input_f, open(clean_text, "w") as clean_f:

    for line in input_f.read().splitlines():
      clean_line = line

      #Check each line for tokens
      if line_break_token in line:
        clean_line = clean_line.replace(line_break_token, "")
      if start_token in line:
        clean_line = clean_line.replace(start_token, "")
      if end_token in line:
        clean_line = clean_line.replace(end_token, "")
      if verse_break_token in line:
        clean_line = clean_line.replace(verse_break_token, "")

      clean_f.write(clean_line + "\n")

In [8]:
from google.colab import drive 
drive.mount('/content/drive', force_remount=True) 
  
def get_data_by_song(train_file, test_file):
  delimiter = "<|endoftext|>"

  #Split training data into separate songs
  with open('/content/drive/My Drive/'+train_file, 'r') as f:
      full_songs = [song + delimiter for song in f.read().strip().split(delimiter)]

  train_data = list(map(lambda x : x.split(), full_songs))

  #Split testing data into separate songs
  with open('/content/drive/My Drive/'+test_file, 'r') as f:
      full_songs_test = [song + delimiter for song in f.read().strip().split(delimiter)]

  test_data = list(map(lambda x : x.split(), full_songs_test))

  #Build vocabulary dictionary
  vocab = dict()
  counter = 0
  train_ind = []
  for song in train_data:
      song_ind = []
      for word in song:
          if word not in vocab:
              vocab[word] = counter
              counter += 1
          song_ind.append(vocab[word])
      train_ind.append(song_ind)
  
  test_ind = []
  for song in test_data:
      song_ind = []
      for word in song:
        if word not in vocab:
          vocab[word] = counter
          counter += 1
        song_ind.append(vocab[word])
      test_ind.append(song_ind)
  
  return train_ind, test_ind, vocab

Mounted at /content/drive


In [52]:
import tensorflow as tf
from tensorflow.keras import Model
import numpy as np
# from preprocess import get_data
# from preprocess import get_data_by_song
import copy


class Model(tf.keras.Model):
    def __init__(self, vocab_size):
        """
        The Model class predicts the next words in a sequence.

        :param vocab_size: The number of unique words in the data
        """
        super(Model, self).__init__()

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

        # initialize vocab_size, embedding_size
        self.vocab_size = vocab_size  
        self.embedding_size = 64
        self.batch_size = 64

        # initialize embeddings and forward pass weights (weights, biases)
        self.E = tf.Variable(tf.random.normal(
            [self.vocab_size, self.embedding_size], stddev=0.1))
        self.lstm = tf.keras.layers.LSTM(
            100, return_sequences=True, return_state=True)
        self.dense1 = tf.keras.layers.Dense(512, activation='relu')
        self.dense2 = tf.keras.layers.Dense(
            self.vocab_size, activation='softmax')

    def call(self, inputs, initial_state):
        """
        :param inputs: word ids of shape (batch_size, window_size)
        :param initial_state: 2-d array of shape (batch_size, rnn_size) as a tensor
        :return: the batch element probabilities as a tensor, a final_state 
        using LSTM and only the probabilites as a tensor and a final_state as a tensor when using GRU
        """
        embeddings = tf.nn.embedding_lookup(self.E, inputs)

        output, mem_output, carry_output = self.lstm(
            embeddings, initial_state=initial_state)
        layer1out = self.dense1(output)
        layer2out = self.dense2(layer1out)

        return layer2out, (mem_output, carry_output)

    def loss(self, probs, labels):
        """
        Calculates average cross entropy sequence to sequence loss of the prediction

        :param logits: a matrix of shape (batch_size, window_size, vocab_size) as a tensor
        :param labels: matrix of shape (batch_size, window_size) containing the labels
        :return: the loss of the model as a tensor of size 1
        """

        return tf.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(labels, probs))


def train(model, train_inputs, train_labels):
    """
    Runs through one epoch - all training examples.

    :param model: the initilized model to use for forward and backward pass
    :param train_inputs: train inputs (all inputs for training) of shape (num_inputs,)
    :param train_labels: train labels (all labels for training) of shape (num_labels,)
    :return: None
    """

    prev_state = None

    #Batching by song
    for song_input, song_label in zip(train_inputs, train_labels):
        batch_inputs = song_input[:-1]
        batch_labels = song_label[1:]
        batch_inputs = tf.reshape(batch_inputs, (-1, len(song_input)-1))
        batch_inputs = tf.dtypes.cast(batch_inputs, tf.int32)
        batch_labels = tf.reshape(batch_labels, (-1, len(song_label)-1))
        batch_labels = tf.dtypes.cast(batch_labels, tf.int32)
        if batch_inputs.shape[1] < 1:
            continue

        with tf.GradientTape() as tape:
            probs, prev_state = model.call(batch_inputs, prev_state)
            loss = model.loss(probs, batch_labels)

        gradients = tape.gradient(loss, model.trainable_variables)
        model.optimizer.apply_gradients(
            zip(gradients, model.trainable_variables))


def test(model, test_inputs, test_labels):
    """
    Runs through one epoch - all testing examples

    :param model: the trained model to use for prediction
    :param test_inputs: train inputs (all inputs for testing) of shape (num_inputs,)
    :param test_labels: train labels (all labels for testing) of shape (num_labels,)
    :returns: perplexity of the test set
    """

    prev_state = None

    total_loss = 0
    num_batches = 0
    for song_input, song_label in zip(test_inputs, test_labels):
        batch_inputs = song_input[:-1]
        batch_labels = song_label[1:]
        batch_inputs = tf.reshape(batch_inputs, (-1, len(song_input)-1))
        batch_inputs = tf.dtypes.cast(batch_inputs, tf.int32)
        batch_labels = tf.reshape(batch_labels, (-1, len(song_label)-1))
        batch_labels = tf.dtypes.cast(batch_labels, tf.int32)
        if batch_inputs.shape[1] < 1:
            continue

        probs, prev_state = model.call(batch_inputs, prev_state)
        loss = model.loss(probs, batch_labels)

        total_loss += loss
        num_batches += 1

    return tf.math.exp(total_loss / num_batches)


def generate_sentence(word1, length, vocab, model, sample_n=10):
    """
    Takes a model, vocab, selects from the most likely next word from the model's distribution

    :param model: trained RNN model
    :param vocab: dictionary, word to id mapping
    :return: None
    """

    reverse_vocab = {idx: word for word, idx in vocab.items()}
    previous_state = None

    first_string = word1
    first_word_index = vocab[word1]
    next_input = [[first_word_index]]
    text = [first_string]

    for i in range(length):
        logits, previous_state = model.call(next_input, previous_state)
        logits = np.array(logits[0, -1, :])
        top_n = np.argsort(logits)[-sample_n:]
        n_logits = np.exp(logits[top_n])/np.exp(logits[top_n]).sum()
        out_index = np.random.choice(top_n, p=n_logits)

        text.append(reverse_vocab[out_index])
        next_input[0].append(out_index)

    final = " ".join(text)

    #Add newlines at different tokens
    verses_with_breaks = list(map(lambda x: x.replace("<|line_break|>", "<|line_break|>\n"),
                                  final.split("<|verse_break|>")))
    final = "\n<|verse_break|>\n".join(verses_with_breaks)
    final = final.replace("<|startoftext|>", "<|startoftext|>\n")
    final = final.replace("<|endoftext|>", "\n<|endoftext|>\n")

    print("sentence generated")
    return final
  
def get_results(result_file, vocab, model):
    results = open(result_file, mode='w')

    #Write generated lyrics to results file
    for i in range(40):
      results.write(generate_sentence("<|startoftext|>", 1024, vocab, model) + "\n\n\n")


def main():
    # Pre-process and vectorize the data
    print("Loading data...")

    train_data, test_data, vocab = get_data_by_song("data_repeat_tokens.txt", "test.txt")

    train_inputs = copy.copy(train_data)
    train_inputs = np.array(train_inputs)
    train_labels = copy.copy(train_data)
    train_labels = np.array(train_labels)

    test_inputs = copy.copy(test_data)
    test_inputs = np.array(test_inputs)
    test_labels = copy.copy(test_data)
    test_labels = np.array(test_labels)

    # Initialize model
    model = Model(len(vocab))

    # Training
    print("Training...")
    for i in range(5):
        print("Epoch: ", i)
        train(model, train_inputs, train_labels)

    # Testing
    print("Testing...")
    perp = test(model, test_inputs, test_labels)

    get_results("results.txt", vocab, model)


if __name__ == '__main__':
    main()


Loading data...
Training...
Epoch:  0
Epoch:  1
Epoch:  2
Epoch:  3
Epoch:  4
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated
sentence generated


In [53]:
strip_tokens("/content/drive/My Drive/data_repeat_tokens.txt", "clean_data.txt")
strip_tokens("results.txt", "clean_results.txt")

In [54]:
!pip install text-matcher
!pip install nltk
!python -m nltk.downloader stopwords

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [55]:
!rm -rf log.csv
!text-matcher clean_data.txt clean_results.txt -l log.csv

3 total matches found.


In [56]:
import csv
import re

def get_location_tuple(location_str):
  location_lst = location_str.split("] [")

  replace_markers = location_lst[0].replace("[", "").replace("]", "").replace("(", "").replace("), ", "|").replace(")", "")
  convert_int = [(int(s.split(", ")[0]), int(s.split(", ")[1])) for s in replace_markers.split("|")]

  return convert_int

def calc_char_diff(location_tuple):
  return location_tuple[1] - location_tuple[0]

with open('log.csv', newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      match_lst_A = get_location_tuple(row['Locations in A'])
      match_lst_B = get_location_tuple(row['Locations in B'])

      num_matches = int(row['Num Matches'])
      generated_text_len = float(row['Text B Length'])

    total_plagiarized = 0
    for match_idx in range(num_matches):
      print("\nMatch ", match_idx + 1)

      total_plagiarized += calc_char_diff(match_lst_B[match_idx])
      print("Number of characters plagiarized: ", total_plagiarized)

    percent_plagiarized = total_plagiarized / float(generated_text_len)
    print("\nPercentage plagiarized from the corpus: ", percent_plagiarized)


Match  1
Number of characters plagiarized:  175

Percentage plagiarized from the corpus:  0.017438963627304436


In [57]:
import math
import re
from collections import Counter

WORD = re.compile(r"\w+")


def get_cosine(vec1, vec2):
    intersection = set(vec1.keys()) & set(vec2.keys())
    numerator = sum([vec1[x] * vec2[x] for x in intersection])

    sum1 = sum([vec1[x] ** 2 for x in list(vec1.keys())])
    sum2 = sum([vec2[x] ** 2 for x in list(vec2.keys())])
    denominator = math.sqrt(sum1) * math.sqrt(sum2)

    if not denominator:
        return 0.0
    else:
        return float(numerator) / denominator


def text_to_vector(text):
    words = WORD.findall(text)
    return Counter(words)

with open("/content/drive/My Drive/data_repeat_tokens.txt", "r") as big_corpus, open("results.txt", "r") as results:
  vector1 = text_to_vector(big_corpus.read())
  vector2 = text_to_vector(results.read())

  cosine = get_cosine(vector1, vector2)

print("Cosine similarity:", cosine)

Cosine similarity: 0.8644577560518606
