

1. 讀取/預處理資料
  *   讀取你的語料庫(corpus)並斷詞(tokenize)處理
  *   切分訓練/測試集
  *   對出現頻率低的詞統一標記為<unk>

2. 開發你的N-gram語言模型
  *   計算給定資料集的n-gram
  *   使用k-smoothing計算下一個字的條件機率
3. 評估你的N-gram模型
4. 實際測試



In [9]:
import math
import random
import numpy as np
import pandas as pd
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [6]:
# 斷句
def split_sentences(data):
  sentences = data.split('\n')  # 依換行符號切分字串
  sentences = [s.strip() for s in sentences]  # .strip()去除字串前後無意義的字元，若指定參數則去除特定字元
  sentences = [s for s in sentences if len(s) > 0]  # 去除空白句子
  return sentences

In [5]:
x = """
I have a pen.
Ihave an apple.
"""

split_sentences(x)

['I have a pen.', 'Ihave an apple.']

In [7]:
# 斷詞
def tokenize_sentences(sentences):
  tokenize_sentences = []
  for s in sentences:
    s = s.lower()
    token = nltk.word_tokenize(s)
    tokenize_sentences.append(token)

  
  return tokenize_sentences

In [10]:
sentences = ["Sky is blue.", "Leaves are green.", "Roses are red."]
tokenize_sentences(sentences)

[['sky', 'is', 'blue', '.'],
 ['leaves', 'are', 'green', '.'],
 ['roses', 'are', 'red', '.']]

In [12]:
# 資料預處理-1
def get_tokenized_data(data):
  sentences = split_sentences(data)
  tokenized_sentences = tokenize_sentences(sentences)

  return tokenized_sentences

In [15]:
# 讀取語料庫
with open("en_US.twitter.txt", "r") as f:
    data = f.read()
tokenized_data = get_tokenized_data(data)
random.seed(87)
random.shuffle(tokenized_data)
t = int(len(tokenized_data) * 0.8)
train_data = tokenized_data[0:t]
test_data = tokenized_data[t:] 

In [16]:
print("{} data are split into {} train and {} test set".format(
    len(tokenized_data), len(train_data), len(test_data)))

print("First training sample:")
print(train_data[0])
      
print("First test sample")
print(test_data[0])

47961 data are split into 38368 train and 9593 test set
First training sample:
['i', 'personally', 'would', 'like', 'as', 'our', 'official', 'glove', 'of', 'the', 'team', 'local', 'company', 'and', 'quality', 'production']
First test sample
['that', 'picture', 'i', 'just', 'seen', 'whoa', 'dere', '!', '!', '>', '>', '>', '>', '>', '>', '>']


In [17]:
# 計算詞頻
def count_words(tokenized_sentences):
  word_counts = {}
  for sentence in tokenized_sentences:
    for token in sentence:
      if token not in word_counts.keys():
        word_counts[token] = 1
      else:
        word_counts[token] += 1
  return word_counts

In [18]:
tokenized_sentences = [['sky', 'is', 'blue', '.'],
                       ['leaves', 'are', 'green', '.'],
                       ['roses', 'are', 'red', '.']]
count_words(tokenized_sentences)

{'.': 3,
 'are': 2,
 'blue': 1,
 'green': 1,
 'is': 1,
 'leaves': 1,
 'red': 1,
 'roses': 1,
 'sky': 1}

In [19]:
# 處理closed vocabulary，只計算詞頻達一定程度的字
def get_words_with_nplus_frequency(tokenized_sentences, frequency):
  closed_vocab = []
  word_counts = count_words(tokenized_sentences)
  for word, count in word_counts.items():
    if count >= frequency:
      closed_vocab.append(word)
  return closed_vocab

In [21]:
tokenized_sentences = [['sky', 'is', 'blue', '.'],
                       ['leaves', 'are', 'green', '.'],
                       ['roses', 'are', 'red', '.']]
tmp_closed_vocab = get_words_with_nplus_frequency(tokenized_sentences, frequency=2)
print(f"Closed vocabulary:")
print(tmp_closed_vocab)

Closed vocabulary:
['.', 'are']


In [24]:
# 處理OOV(out of vocabulary)，詞頻過低的字全用unk取代
def replace_oov_words_by_unk(tokenized_sentences, vocabulary, unknown_token="<unk>"):
  vocabulary = set(vocabulary)
  replaced_tokenized_sentences = []

  for sentence in tokenized_sentences:
    replaced_sentence = []
    for token in sentence:
      if token in vocabulary:
        replaced_sentence.append(token)
      else:
        replaced_sentence.append(unknown_token)
    replaced_tokenized_sentences.append(replaced_sentence)
  return replaced_tokenized_sentences

In [25]:
tokenized_sentences = [["dogs", "run"], ["cats", "sleep"]]
vocabulary = ["dogs", "sleep"]
tmp_replaced_tokenized_sentences = replace_oov_words_by_unk(tokenized_sentences, vocabulary)
print(f"Original sentence:")
print(tokenized_sentences)
print(f"tokenized_sentences with less frequent words converted to '<unk>':")
print(tmp_replaced_tokenized_sentences)

Original sentence:
[['dogs', 'run'], ['cats', 'sleep']]
tokenized_sentences with less frequent words converted to '<unk>':
[['dogs', '<unk>'], ['<unk>', 'sleep']]


In [26]:
# 資料預處理-2
def preprocess_data(train_data,test_data,frequency):
  vocabulary = get_words_with_nplus_frequency(train_data,frequency)
  train_data_replaced = replace_oov_words_by_unk(train_data,vocabulary)
  test_data_replaced = replace_oov_words_by_unk(test_data,vocabulary)

  return train_data_replaced, test_data_replaced, vocabulary

In [27]:
frequency = 2
train_data_processed, test_data_processed, vocabulary = preprocess_data(train_data,test_data,frequency)

In [28]:
print("First preprocessed training sample:")
print(train_data_processed[0])
print()
print("First preprocessed test sample:")
print(test_data_processed[0])
print()
print("First 10 vocabulary:")
print(vocabulary[0:10])
print()
print("Size of vocabulary:", len(vocabulary))

First preprocessed training sample:
['i', 'personally', 'would', 'like', 'as', 'our', 'official', 'glove', 'of', 'the', 'team', 'local', 'company', 'and', 'quality', 'production']

First preprocessed test sample:
['that', 'picture', 'i', 'just', 'seen', 'whoa', 'dere', '!', '!', '>', '>', '>', '>', '>', '>', '>']

First 10 vocabulary:
['i', 'personally', 'would', 'like', 'as', 'our', 'official', 'glove', 'of', 'the']

Size of vocabulary: 14944


In [29]:
def count_n_grams(data,n,start_token='<s>',end_token = '<e>'):
  """
    data:被你切成一個個token words的字串的語料庫
    n:看你要做uni,bi,還是trigrams，在字串前加上等量的<s>
  """
  n_grams = {}
  for sentence in data:
    sentence = [start_token]*n + sentence + [end_token]
    # list轉tuple，這樣每個字串中的字詞才能作為key
    sentence = tuple(sentence)

    m = len(sentence) if n ==1 else len(sentence)-1
    for i in range(m):
      n_gram = sentence[i:i+n]
      if n_gram in n_grams.keys():
        n_grams[n_gram] += 1
      else:
        n_grams[n_gram] = 1
  return n_grams

In [30]:
sentences = [['i', 'like', 'a', 'cat'],
             ['this', 'dog', 'is', 'like', 'a', 'cat']]
print("Uni-gram:")
print(count_n_grams(sentences, 1))
print("Bi-gram:")
print(count_n_grams(sentences, 2))

Uni-gram:
{('<s>',): 2, ('i',): 1, ('like',): 2, ('a',): 2, ('cat',): 2, ('<e>',): 2, ('this',): 1, ('dog',): 1, ('is',): 1}
Bi-gram:
{('<s>', '<s>'): 2, ('<s>', 'i'): 1, ('i', 'like'): 1, ('like', 'a'): 2, ('a', 'cat'): 2, ('cat', '<e>'): 2, ('<s>', 'this'): 1, ('this', 'dog'): 1, ('dog', 'is'): 1, ('is', 'like'): 1}


In [32]:
# 計算出現前n個詞，下一個詞是word的機率
def estimate_probability(word,previous_n_gram,n_gram_counts,n_plus1_gram_counts,vocabulary_size,k=1.0):

  previous_n_gram = tuple(previous_n_gram)
  previous_n_gram_count = n_gram_counts[previous_n_gram] if previous_n_gram in n_gram_counts  else 0
  denominator = previous_n_gram_count + k * vocabulary_size
  n_plus1_gram = previous_n_gram + (word,)
  n_plus1_gram_count = n_plus1_gram_counts[n_plus1_gram] if n_plus1_gram in n_plus1_gram_counts  else 0
  numerator = n_plus1_gram_count + k
  probability = numerator / denominator
  return probability

In [33]:
# 將vocabulary中所有的word逐一帶入，計算出現前n個詞，下一個詞是word的機率
def estimate_probabilities(previous_n_gram, n_gram_counts, n_plus1_gram_counts, vocabulary, k=1.0):
    
    previous_n_gram = tuple(previous_n_gram)
    
    vocabulary = vocabulary + ["<e>", "<unk>"]
    vocabulary_size = len(vocabulary)
    
    probabilities = {}
    for word in vocabulary:
        probability = estimate_probability(word, previous_n_gram,n_gram_counts, n_plus1_gram_counts,vocabulary_size, k=k)
        probabilities[word] = probability

    return probabilities

In [34]:
def make_count_matrix(n_plus1_gram_counts, vocabulary):

    vocabulary = vocabulary + ["<e>", "<unk>"]
    n_grams = []
    for n_plus1_gram in n_plus1_gram_counts.keys():
        n_gram = n_plus1_gram[0:-1]
        n_grams.append(n_gram)
    n_grams = list(set(n_grams))
    
    # mapping from n-gram to row
    row_index = {n_gram:i for i, n_gram in enumerate(n_grams)}
    # mapping from next word to column
    col_index = {word:j for j, word in enumerate(vocabulary)}
    
    nrow = len(n_grams)
    ncol = len(vocabulary)
    count_matrix = np.zeros((nrow, ncol))
    for n_plus1_gram, count in n_plus1_gram_counts.items():
        n_gram = n_plus1_gram[0:-1]
        word = n_plus1_gram[-1]
        if word not in vocabulary:
            continue
        i = row_index[n_gram]
        j = col_index[word]
        count_matrix[i, j] = count
    
    count_matrix = pd.DataFrame(count_matrix, index=n_grams, columns=vocabulary)
    return count_matrix

In [31]:
def make_probability_matrix(n_plus1_gram_counts, vocabulary, k):
    count_matrix = make_count_matrix(n_plus1_gram_counts, unique_words)
    count_matrix += k
    prob_matrix = count_matrix.div(count_matrix.sum(axis=1), axis=0)
    return prob_matrix

In [35]:
def calculate_perplexity(sentence, n_gram_counts, n_plus1_gram_counts, vocabulary_size, k=1.0):
    """
    Calculate perplexity for a list of sentences
    
    Args:
        sentence: List of strings
        n_gram_counts: Dictionary of counts of (n+1)-grams
        n_plus1_gram_counts: Dictionary of counts of (n+1)-grams
        vocabulary_size: number of unique words in the vocabulary
        k: Positive smoothing constant
    
    Returns:
        Perplexity score
    """
    # length of previous words
    n = len(list(n_gram_counts.keys())[0]) 
    
    # prepend <s> and append <e>
    sentence = ["<s>"] * n + sentence + ["<e>"]
    
    # Cast the sentence from a list to a tuple
    sentence = tuple(sentence)
    
    # length of sentence (after adding <s> and <e> tokens)
    N = len(sentence)
    
    # The variable p will hold the product
    # that is calculated inside the n-root
    # Update this in the code below
    product_pi = 1.0
    
    ### START CODE HERE (Replace instances of 'None' with your code) ###
    # Index t ranges from n to N - 1
    for t in range(n, N): # complete this line

        # get the n-gram preceding the word at position t
        n_gram = sentence[t-n:t]
        
        # get the word at position t
        word = sentence[t]
        
        # Estimate the probability of the word given the n-gram
        # using the n-gram counts, n-plus1-gram counts,
        # vocabulary size, and smoothing constant
        probability = estimate_probability(word,n_gram, n_gram_counts, n_plus1_gram_counts, len(unique_words), k=1)
        
        # Update the product of the probabilities
        # This 'product_pi' is a cumulative product 
        # of the (1/P) factors that are calculated in the loop
        product_pi *= 1 / probability

    # Take the Nth root of the product
    perplexity = product_pi**(1/float(N))
    
    ### END CODE HERE ### 
    return perplexity

In [36]:
def suggest_a_word(previous_tokens, n_gram_counts, n_plus1_gram_counts, vocabulary, k=1.0, start_with=None):
    
    

    n = len(list(n_gram_counts.keys())[0]) 
    previous_n_gram = previous_tokens[-n:]
    probabilities = estimate_probabilities(previous_n_gram,n_gram_counts, n_plus1_gram_counts,vocabulary, k=k)
    
    # Initialize suggested word to None
    # This will be set to the word with highest probability
    suggestion = None
    max_prob = 0
    
    ### START CODE HERE (Replace instances of 'None' with your code) ###
    
    # For each word and its probability in the probabilities dictionary:
    for word, prob in probabilities.items(): # complete this line
        
        # If the optional start_with string is set
        if start_with != None: # complete this line

            # Check if the word starts with the letters in 'start_with'
            if not word.startswith(start_with): # complete this line

                #If so, don't consider this word (move onto the next word)
                continue # complete this line
        
        # Check if this word's probability
        # is greater than the current maximum probability
        if prob > max_prob: # complete this line
            
            # If so, save this word as the best suggestion (so far)
            suggestion = word
            
            # Save the new maximum probability
            max_prob = prob

    ### END CODE HERE
    
    return suggestion, max_prob

In [None]:
def get_suggestions(previous_tokens, n_gram_counts_list, vocabulary, k=1.0, start_with=None):
    model_counts = len(n_gram_counts_list)
    suggestions = []
    for i in range(model_counts-1):
        n_gram_counts = n_gram_counts_list[i]
        n_plus1_gram_counts = n_gram_counts_list[i+1]
        
        suggestion = suggest_a_word(previous_tokens, n_gram_counts,
                                    n_plus1_gram_counts, vocabulary,
                                    k=k, start_with=start_with)
        suggestions.append(suggestion)
    return suggestions