In [14]:
import csv
import re
from collections import Counter

path_to_kaggle_dataset = r'C:\Users\21022289\Desktop\C290\unigram_freq.csv'
kaggle_words = {}
with open(path_to_kaggle_dataset, 'r') as file:
    csv_reader = csv.reader(file)
    next(csv_reader) 
    for row in csv_reader:
        word, frequency = row
        kaggle_words[word] = int(frequency)

class SpellCorrector:

    def __init__(self):
        super().__init__()
        self.WORDS = kaggle_words
        self.N = sum(self.WORDS.values())
        self.REGEX_TOKEN = re.compile(r'\b\w+\b')

    def tokens(self, text):
        return self.REGEX_TOKEN.findall(text.lower())

    def P(self, word):
        return self.WORDS[word] / self.N

    def most_probable(self, words):
        _known = self.known(words)
        if _known:
            return max(_known, key=self.P)
        else:
            return []

    @staticmethod
    def edit_step(word):
        letters = 'abcdefghijklmnopqrstuvwxyz'
        splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
        deletes = [L + R[1:] for L, R in splits if R]
        transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]
        replaces = [L + c + R[1:] for L, R in splits if R for c in letters]
        inserts = [L + c + R for L, R in splits for c in letters]
        return set(deletes + transposes + replaces + inserts)

    def edits2(self, word):
        return (e2 for e1 in self.edit_step(word)
                for e2 in self.edit_step(e1))

    def known(self, words):
        return set(w for w in words if w in self.WORDS)

    def edit_candidates(self, word, assume_wrong=False, fast=True):

        if fast:
            ttt = self.known(self.edit_step(word)) or {word}
        else:
            ttt = self.known(self.edit_step(word)) or self.known(self.edits2(word)) or {word}

        ttt = self.known([word]) | ttt
        return list(ttt)


In [30]:
import tensorflow as tf
import numpy as np
from transformers import RobertaTokenizer, TFRobertaModel, RobertaForMaskedLM
import pyttsx3
from fastpunct import FastPunct
import torch
from math import log

words = open("word_freq copy.txt").read().split()
wordcost = dict((k, log((i+1)*log(len(words)))) for i,k in enumerate(words))
maxword = max(len(x) for x in words)

word_list=["which","our","the","of","to","and","a","in","is","it","was","for","will","be","am","on","with","an","have","its",".","?",",","!"]

config = 'roberta-large'
tokenizer = RobertaTokenizer.from_pretrained('roberta-large')
model = RobertaForMaskedLM.from_pretrained('roberta-large')
fastpunct = FastPunct() 
engine = pyttsx3.init()
corrector = SpellCorrector()

class Model(tf.keras.Model):
    def __init__(self):
        super(Model, self).__init__()
        self.roberta = TFRobertaModel.from_pretrained(config, from_pt=True)
        self.roberta_config = self.roberta.config

    def call(self, input_ids):
        output_layer = self.roberta(input_ids)[0]
        embedding = self.roberta.get_input_embeddings()

        with tf.name_scope('cls/predictions'):
            with tf.name_scope('transform'):
                input_tensor = tf.keras.layers.Dense(
                    units=self.roberta_config.hidden_size, 
                    activation=self.roberta_config.hidden_act,
                    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=self.roberta_config.initializer_range),
                )(output_layer)
                input_tensor = tf.keras.layers.LayerNormalization()(input_tensor)

            output_bias = tf.Variable(tf.zeros([self.roberta_config.vocab_size], dtype=tf.float32), name='output_bias')
            logits = tf.matmul(input_tensor, embedding.weights[0], transpose_b=True)
            logits = tf.nn.bias_add(logits, output_bias)
            return logits

model1 = Model()

def convert_to_speech(sentence):
    engine.say(sentence)
    engine.runAndWait()

def tokens_to_masked_ids(tokens, mask_ind):
    masked_tokens = tokens[:]
    masked_tokens[mask_ind] = "[MASK]"
    masked_tokens = ["[CLS]"] + masked_tokens + ["[SEP]"]
    masked_ids = tokenizer.convert_tokens_to_ids(masked_tokens)
    return masked_ids

def get_score(mask):
    tokens = tokenizer.tokenize(mask)
    input_ids = [tokens_to_masked_ids(tokens, i) for i in range(len(tokens))]
    input_ids = tf.keras.preprocessing.sequence.pad_sequences(input_ids, padding='post')
    preds = tf.nn.softmax(model1(input_ids))
    tokens_ids = tokenizer.convert_tokens_to_ids(tokens)
    return np.prod([preds[i, i + 1, x] for i, x in enumerate(tokens_ids)])

def infer_spaces(s):
    def best_match(i):
        candidates = enumerate(reversed(cost[max(0, i-maxword):i]))
        return min((c + wordcost.get(s[i-k-1:i], 9e999), k+1) for k,c in candidates)

    cost = [0]
    for i in range(1,len(s)+1):
        c,k = best_match(i)
        cost.append(c)

    out = []
    i = len(s)
    while i>0:
        c,k = best_match(i)
        assert c == cost[i]
        out.append(s[i-k:i])
        i -= k

    return " ".join(reversed(out))

def get_prediction(sent):
    token_ids = tokenizer.encode(sent, return_tensors='pt')
    masked_position = (token_ids.squeeze() == tokenizer.mask_token_id).nonzero()
    masked_pos = [mask.item() for mask in masked_position ]

    with torch.no_grad():
        output = model(token_ids)

    last_hidden_state = output[0].squeeze()

    list_of_list =[]
    for index,mask_index in enumerate(masked_pos):
        mask_hidden_state = last_hidden_state[mask_index]
        idx = torch.topk(mask_hidden_state, k=5, dim=0)[1]
        words = [tokenizer.decode(i.item()).strip() for i in idx]
        list_of_list.append(words)
    
    best_guess = []
    for j in list_of_list:
        best_guess.append(j[0])
        
    return best_guess

def update_sentence(text):
    words = text.split()
    replaced_words = []
    updated_sentence = text
    for word in words:
        text_mask = text.replace(word, '[MASK]')
        possible_states = corrector.edit_candidates(word)
        scores = []
        prob_scores = 0.0
        if word not in possible_states:
            replaced_masks = [text_mask.replace('[MASK]', state) for state in possible_states]

            scores = [get_score(mask) for mask in replaced_masks]
            print(scores)

            prob_scores = np.array(scores)/np.sum(scores)

            probs = list(zip(possible_states, prob_scores))
            probs.sort(key=lambda x: x[1])
            print(probs)

            best_word, _ = probs[-1]
            replaced_words.append(best_word)
            updated_sentence = updated_sentence.replace(word, best_word)
    
    return updated_sentence

def insert_words(text):
    words = text.split()
    predictions = []

    for i in range(len(words)):
        output = " ".join(words[:i+1] + ["_"] + words[i+1:])
        masked_output = output.replace("_","<mask>")
        predicted_blanks = get_prediction(masked_output)
        for word in predicted_blanks:
            if word in word_list:
                predictions.append([word, i])

    first_iteration=False
    for i, l in enumerate(predictions,1):
        word, index = l
        words.insert(index+i, word)
    correct_text=(' '.join(words))
    return correct_text

def correct_sentence(text):
    spaced_text = infer_spaces(text)
    updated_text = update_sentence(spaced_text)
    correct_text = insert_words(updated_text)
    correct_text_with_punct = fastpunct.punct(correct_text)
    print(correct_text_with_punct)
    convert_to_speech(correct_text_with_punct)

class Model(tf.keras.Model):
    def __init__(self):
        super(Model, self).__init__()
        self.roberta = TFRobertaModel.from_pretrained(config, from_pt=True)
        self.roberta_config = self.roberta.config

    def call(self, input_ids):
        output_layer = self.roberta(input_ids)[0]
        embedding = self.roberta.get_input_embeddings()

        with tf.name_scope('cls/predictions'):
            with tf.name_scope('transform'):
                input_tensor = tf.keras.layers.Dense(
                    units=self.roberta_config.hidden_size, 
                    activation=self.roberta_config.hidden_act,
                    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=self.roberta_config.initializer_range),
                )(output_layer)
                input_tensor = tf.keras.layers.LayerNormalization()(input_tensor)

            output_bias = tf.Variable(tf.zeros([self.roberta_config.vocab_size], dtype=tf.float32), name='output_bias')
            logits = tf.matmul(input_tensor, embedding.weights[0], transpose_b=True)
            logits = tf.nn.bias_add(logits, output_bias)
            return logits

model1 = Model()

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFRobertaModel: ['lm_head.bias', 'lm_head.layer_norm.bias', 'lm_head.decoder.weight', 'lm_head.dense.weight', 'lm_head.dense.bias', 'lm_head.layer_norm.weight']
- This IS expected if you are initializing TFRobertaModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFRobertaModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFRobertaModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaModel for predictions without further training.


In [32]:
text = "goodmorninghowareyouallyesterdayiablecreateapithisapplicationithinkveryusefulforthebusinesspooposal"
correct_sentence(text)

[0.0]
[('proposal', nan)]


  prob_scores = np.array(scores)/np.sum(scores)


Good morning, how are you all? yesterday was I was able to create api for this application which I think is very useful for the business proposal.
