<a href="https://colab.research.google.com/github/superbunny38/MachineLearning/blob/main/CS229MachineLearning/2022AndrewLectureNotes/Probset2/Prob6/Set2Problem6(a)(b)(c)(d).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import math
import pandas as pd
import random

**Rights**<br>
*Assignment made by Prof. Andrew Ng*<br>
*Assignment solved by Chaeeun Ryu*

### Utils

In [2]:
import csv

import matplotlib.pyplot as plt
import numpy as np
import json


def add_intercept_fn(x):
    """Add intercept to matrix x.

    Args:
        x: 2D NumPy array.

    Returns:
        New matrix same as x with 1's in the 0th column.
    """
    new_x = np.zeros((x.shape[0], x.shape[1] + 1), dtype=x.dtype)
    new_x[:, 0] = 1
    new_x[:, 1:] = x

    return new_x

def load_csv(csv_path, label_col='y', add_intercept=False):
    """Load dataset from a CSV file.

    Args:
         csv_path: Path to CSV file containing dataset.
         label_col: Name of column to use as labels (should be 'y' or 'l').
         add_intercept: Add an intercept entry to x-values.

    Returns:
        xs: Numpy array of x-values (inputs).
        ys: Numpy array of y-values (labels).
    """

    # Load headers
    with open(csv_path, 'r', newline='') as csv_fh:
        headers = csv_fh.readline().strip().split(',')

    # Load features and labels
    x_cols = [i for i in range(len(headers)) if headers[i].startswith('x')]
    l_cols = [i for i in range(len(headers)) if headers[i] == label_col]
    inputs = np.loadtxt(csv_path, delimiter=',', skiprows=1, usecols=x_cols)
    labels = np.loadtxt(csv_path, delimiter=',', skiprows=1, usecols=l_cols)

    if inputs.ndim == 1:
        inputs = np.expand_dims(inputs, -1)

    if add_intercept:
        inputs = add_intercept_fn(inputs)

    return inputs, labels

def load_spam_dataset(tsv_path):
    """Load the spam dataset from a TSV file

    Args:
         csv_path: Path to TSV file containing dataset.

    Returns:
        messages: A list of string values containing the text of each message.
        labels: The binary labels (0 or 1) for each message. A 1 indicates spam.
    """

    messages = []
    labels = []

    with open(tsv_path, 'r', newline='', encoding='utf8') as tsv_file:
        reader = csv.reader(tsv_file, delimiter='\t')

        for label, message in reader:
            messages.append(message)
            labels.append(1 if label == 'spam' else 0)

    return messages, np.array(labels)

def plot(x, y, theta, save_path, correction=1.0):
    """Plot dataset and fitted logistic regression parameters.

    Args:
        x: Matrix of training examples, one per row.
        y: Vector of labels in {0, 1}.
        theta: Vector of parameters for logistic regression model.
        save_path: Path to save the plot.
        correction: Correction factor to apply (Problem 2(e) only).
    """
    # Plot dataset
    plt.figure()
    plt.plot(x[y == 1, -2], x[y == 1, -1], 'bx', linewidth=2)
    plt.plot(x[y == 0, -2], x[y == 0, -1], 'go', linewidth=2)

    # Plot decision boundary (found by solving for theta^T x = 0)
    x1 = np.arange(min(x[:, -2]), max(x[:, -2]), 0.01)
    x2 = -(theta[0] / theta[2] * correction + theta[1] / theta[2] * x1)
    plt.plot(x1, x2, c='red', linewidth=2)

    # Add labels and save to disk
    plt.xlabel('x1')
    plt.ylabel('x2')
    plt.savefig(save_path)


def plot_contour(predict_fn):
    """Plot a contour given the provided prediction function"""
    x, y = np.meshgrid(np.linspace(-10, 10, num=20), np.linspace(-10, 10, num=20))
    z = np.zeros(x.shape)

    for i in range(x.shape[0]):
        for j in range(y.shape[1]):
            z[i, j] = predict_fn([x[i, j], y[i, j]])

    plt.contourf(x, y, z, levels=[-float('inf'), 0, float('inf')], colors=['orange', 'cyan'])

def plot_points(x, y):
    """Plot some points where x are the coordinates and y is the label"""
    x_one = x[y == 0, :]
    x_two = x[y == 1, :]
    
    plt.scatter(x_one[:,0], x_one[:,1], marker='x', color='red')
    plt.scatter(x_two[:,0], x_two[:,1], marker='o', color='blue')

def write_json(filename, value):
    """Write the provided value as JSON to the given filename"""
    with open(filename, 'w') as f:
        json.dump(value, f)

# Spam classification

In [3]:
#svm
# Important note: you do not have to modify this file for your homework.

import numpy as np
np.random.seed(123)


def train_and_predict_svm(train_matrix, train_labels, test_matrix, radius):
    """Train an SVM model and predict the resulting labels on a test set.

    Args: 
        train_matrix: A numpy array containing the word counts for the train set
        train_labels: A numpy array containing the spam or not spam labels for the train set
        test_matrix: A numpy array containing the word counts for the test set
        radius: The RBF kernel radius to use for the SVM

    Return: 
        The predicted labels for each message
    """
    model = svm_train(train_matrix, train_labels, radius)
    return svm_predict(model, test_matrix, radius)


def svm_train(matrix, category, radius):
    state = {}
    M, N = matrix.shape
    Y = 2 * category - 1
    matrix = 1. * (matrix > 0)
    squared = np.sum(matrix * matrix, axis=1)
    gram = matrix.dot(matrix.T)
    K = np.exp(-(squared.reshape((1, -1)) + squared.reshape((-1, 1)) - 2 * gram) / (2 * (radius ** 2)))

    alpha = np.zeros(M)
    alpha_avg = np.zeros(M)
    L = 1. / (64 * M)
    outer_loops = 10

    alpha_avg = 0
    ii = 0
    while ii < outer_loops * M:
        i = int(np.random.rand() * M)
        margin = Y[i] * np.dot(K[i, :], alpha)
        grad = M * L * K[:, i] * alpha[i]
        if margin < 1:
            grad -= Y[i] * K[:, i]
        alpha -= grad / np.sqrt(ii + 1)
        alpha_avg += alpha
        ii += 1

    alpha_avg /= (ii + 1) * M

    state['alpha'] = alpha
    state['alpha_avg'] = alpha_avg
    state['Xtrain'] = matrix
    state['Sqtrain'] = squared
    return state


def svm_predict(state, matrix, radius):
    M, N = matrix.shape

    Xtrain = state['Xtrain']
    Sqtrain = state['Sqtrain']
    matrix = 1. * (matrix > 0)
    squared = np.sum(matrix * matrix, axis=1)
    gram = matrix.dot(Xtrain.T)
    K = np.exp(-(squared.reshape((-1, 1)) + Sqtrain.reshape((1, -1)) - 2 * gram) / (2 * (radius ** 2)))
    alpha_avg = state['alpha_avg']
    preds = K.dot(alpha_avg)
    output = (1 + np.sign(preds)) // 2

    return output

# (a)
Implement code for processing the spam messages into numpy arrays that can be fed into machine learning models. The provided code will then run your functions and save the resulting dictionary into `p06_dictionary` and a sample of the resulting training matrix into `p06_sample_train_matrix`

In [4]:
!mkdir data

In [5]:
!mkdir output

# (b)
Implement a naive Bayes classifier for spam classification with multimodal event model and Laplace smoothing. Find a way to compute Naive Bayes' predicted class labels without explicitly representing very small numbers such as $p(x|y)$

# (c)
Intuitively, some tokens may be particularly indicative of an SMS being in a particular class. We can try to get an informal sense of how indicative token i is for the SPAM class by looking at:

$$log\frac{p(x_j = i|y=1)}{p(x_j = i|y=0)} = log(\frac{P(token_i|email is SPAM)}{P(token_i email is NOTSPAM)})$$

Complete the `get_top_five_naive_bayes_words` function within the provided code using the above formula in order to optain the 5 most indicative tokens.

# (d)
Support vector machines (SVMs) are an alternative machine learning model we discussed in class. We have provided you an SVM implementation (using a radical basis function (RBF) kernel). One important part of training an SVM parameterized by an RBF kernel is choosing an appropriate kernel radius. **Compute the best SVM radius which maximizes accuracy on the validation dataset.**

In [18]:
from numpy.matrixlib import mat
import collections

import numpy as np



def get_words(message):
    """Get the normalized list of words from a message string.

    This function should split a message into words, normalize them, and return
    the resulting list. For splitting, you should split on spaces. For normalization,
    you should convert everything to lowercase.

    Args:
        message: A string containing an SMS message

    Returns:
       The list of normalized words from the message.
    """

    # *** START CODE HERE ***
    #------splitting-----
    splitted = message.split()
    #------normalization-----
    normalized = [x.lower() for x in splitted]
    return normalized
    # *** END CODE HERE ***


def create_dictionary(messages):
    """Create a dictionary mapping words to integer indices.

    This function should create a dictionary of word to indices using the provided
    training messages. Use get_words to process each message. 

    Rare words are often not useful for modeling. Please only add words to the dictionary
    if they occur in at least five messages.

    Args:
        messages: A list of strings containing SMS messages

    Returns:
        A python dict mapping words to integers.
    """
    all_words = []
    # *** START CODE HERE ***
    for message in messages:
      all_words += get_words(message)
    count = 0
    real_dict = {}
    for key,value in collections.Counter(all_words).items():
      if value >= 4:
        real_dict[key] = count#0부터 채움
        count += 1
    return real_dict
    # *** END CODE HERE ***

'''
apple|banana|car|duck|
  2  |   0  | 1 | 0 
'''
def transform_text(messages, word_dictionary):
    """Transform a list of text messages into a numpy array for further processing.

    This function should create a numpy array that contains the number of times each word
    appears in each message. Each row in the resulting array should correspond to each 
    message and each column should correspond to a word.

    Use the provided word dictionary to map words to column indices. Ignore words that 
    are not present in the dictionary. Use get_words to get the words for a message.

    Args:
        messages: A list of strings where each string is an SMS message.
        word_dictionary: A python dict mapping words to integers.

    Returns:
        A numpy array marking the words present in each message.
    """
    # *** START CODE HERE ***
    matrix = []
    for message in messages:
      row = np.zeros(len(word_dictionary.keys()))
      words_list = get_words(message)
      words_freq = collections.Counter(words_list)
      for word, freq in words_freq.items():
        if word in word_dictionary.keys():
          word_to_integer = word_dictionary[word]
          row[word_to_integer] = freq
      matrix.append(row)
    return np.array(matrix)

    # *** END CODE HERE ***


def fit_naive_bayes_model(matrix, labels, dictionary):
    """Fit a naive bayes model.

    This function should fit a Naive Bayes model given a training matrix and labels.

    The function should return the state of that model.

    Feel free to use whatever datatype you wish for the state of the model.

    Args:
        matrix: A numpy array containing word counts for the training data (n_row: n_samples, n_columns: n_words in dict)
        labels: The binary (0 or 1) labels for that training data

    Returns: The trained model
    """

    # *** START CODE HERE ***
    unique, counts = np.unique(labels, return_counts = True)
    assert unique[0] == 0, print(unique[0])
    assert unique[1] == 1, print(unique[1])
    k = len(dictionary.keys())#laplace smoothing
    p0 = counts[0]/np.sum(counts)
    p1 = counts[1]/np.sum(counts)
    phi0_dict = {}
    phi1_dict = {}
    for _ in range(len(matrix[0])):
      phi0_dict[_] = 0
      phi1_dict[_] = 0
    for i, data_x in enumerate(matrix):
      data_y = labels[i]
      for idx, j in enumerate(data_x):#j: word idx 개수
        if j!= 0:
          if data_y == 1:
            #update phi1_dict
            phi1_dict[idx] += j
          elif data_y == 0:
            #update phi0_dict
            phi0_dict[idx] += j
    #add 1 for numerator according to laplace smoothing
    for _ in range(len(matrix[0])):
      phi0_dict[_] += 1
      phi1_dict[_] += 1
    #calculate denominator
    denom_1 = k#|V|
    denom_0 = k#|V|
    for i, data_x in enumerate(matrix):
      if labels[i] == 0:
        denom_0 += np.sum(data_x)
      elif labels[i] == 1:
        denom_1 += np.sum(data_x)
    #divide by denominator
    for _ in range(len(matrix[0])):
      phi0_dict[_] = phi0_dict[_]/denom_0
      phi1_dict[_] = phi1_dict[_]/denom_1
    print(f"p0: {p0}, p1: {p1}")
    return {'p(y=0)':p0,'p(y=1)':p1, 'k':k, 'phi0_dict':phi0_dict, 'phi1_dict':phi1_dict}#model
    # *** END CODE HERE ***


def predict_from_naive_bayes_model(model, matrix):
    """Use a Naive Bayes model to compute predictions for a target matrix.

    This function should be able to predict on the models that fit_naive_bayes_model
    outputs.

    Args:
        model: A trained model from fit_naive_bayes_model
        matrix: A numpy array containing word counts

    Returns: A numpy array containg the predictions from the model
    """
    
    # *** START CODE HERE ***
    p0 = model['p(y=0)']
    p1 = model['p(y=1)']
    k = model['k']
    phi0_dict = model['phi0_dict']
    phi1_dict = model['phi1_dict']
    predictions = []
    for test_x in matrix:
      p0_ = p0
      p1_ = p1
      for word_idx, freq in enumerate(test_x):
        p0_ = np.exp(p0_*phi0_dict[word_idx])
        p1_ = np.exp(p1_*phi1_dict[word_idx])
      if p0_ > p1_:
        predictions.append(0)
      elif p1_ > p0_:
        predictions.append(1)
      else:
        print("underflow")
        return
    assert len(predictions) == matrix.shape[0]
    return np.array(predictions)
    # *** END CODE HERE ***


def get_top_five_naive_bayes_words(model, dictionary):#how indicative token i is for the SPAM class
    """Compute the top five words that are most indicative of the spam (i.e positive) class.

    Ues the metric given in 6c as a measure of how indicative a word is.
    Return the words in sorted form, with the most indicative word first.

    Args:
        model: The Naive Bayes model returned from fit_naive_bayes_model
        dictionary: A mapping of word to integer ids

    Returns: The top five most indicative words in sorted order with the most indicative first
    """
    # *** START CODE HERE ***
    indicator_dict = {}
    phi0_dict = model['phi0_dict']
    phi1_dict = model['phi1_dict']
    for word_idx in phi0_dict.keys():
      indicator_dict[word_idx] = np.log(phi1_dict[word_idx]/phi0_dict[word_idx])
    
    sorted_dict = dict( sorted(indicator_dict.items(),
                           key=lambda item: item[1],
                           reverse=True))
    count = 0
    top5_keys = []
    for key, value in sorted_dict.items():
      if count > 4:
        break
      top5_keys.append(key)
      count+=1

    top5_ = []
    for word_idx in top5_keys:
      for word, idx in dictionary.items():
        if word_idx == idx:
          top5_.append(word)
    assert len(top5_) == 5
    return top5_
    # *** END CODE HERE ***


def compute_best_svm_radius(train_matrix, train_labels, val_matrix, val_labels, radius_to_consider):
    """Compute the optimal SVM radius using the provided training and evaluation datasets.

    You should only consider radius values within the radius_to_consider list.
    You should use accuracy as a metric for comparing the different radius values.

    Args:
        train_matrix: The word counts for the training data
        train_labels: The spma or not spam labels for the training data
        val_matrix: The word counts for the validation data
        val_labels: The spam or not spam labels for the validation data
        radius_to_consider: The radius values to consider
    
    Returns:
        The best radius which maximizes SVM accuracy.
    """
    # *** START CODE HERE ***
    best_val_acc = 0.
    for radius in radius_to_consider:
      val_preds = train_and_predict_svm(train_matrix, train_labels, val_matrix, radius)
      val_acc = np.mean(val_preds == val_labels)
      if val_acc > best_val_acc:
        best_radius = radius
        best_val_acc = val_acc
    print(f"best validation acc:{best_val_acc} (w/ radius {best_radius})")
    return best_radius
    # *** END CODE HERE ***


def main():
    train_messages, train_labels = load_spam_dataset('./data/ds6_train.tsv')
    val_messages, val_labels = load_spam_dataset('./data/ds6_val.tsv')
    test_messages, test_labels = load_spam_dataset('./data/ds6_test.tsv')
    print(">> ... data loaded")
    dictionary = create_dictionary(train_messages)

    write_json('./output/p06_dictionary', dictionary)

    train_matrix = transform_text(train_messages, dictionary)
    # print("train mat shape:",train_matrix.shape)
    np.savetxt('./output/p06_sample_train_matrix', train_matrix[:100,:])

    val_matrix = transform_text(val_messages, dictionary)
    test_matrix = transform_text(test_messages, dictionary)
    print(">> ... word embedding done")

    naive_bayes_model = fit_naive_bayes_model(train_matrix, train_labels, dictionary)
    print(">> ... fitting naive bayes model done")

    naive_bayes_predictions = predict_from_naive_bayes_model(naive_bayes_model, test_matrix)

    np.savetxt('./output/p06_naive_bayes_predictions', naive_bayes_predictions)

    naive_bayes_accuracy = np.mean(naive_bayes_predictions == test_labels)

    print('Naive Bayes (w/ laplace smoothing) had an accuracy of {} on the testing set'.format(naive_bayes_accuracy))

    top_5_words = get_top_five_naive_bayes_words(naive_bayes_model, dictionary)

    print('The top 5 indicative words for Naive Bayes are: ', top_5_words)

    write_json('./output/p06_top_indicative_words', top_5_words)

    optimal_radius = compute_best_svm_radius(train_matrix, train_labels, val_matrix, val_labels, [0.01, 0.1, 1, 10])

    write_json('./output/p06_optimal_radius', optimal_radius)

    print('The optimal SVM radius was {}'.format(optimal_radius))

    svm_predictions = train_and_predict_svm(train_matrix, train_labels, test_matrix, optimal_radius)

    svm_accuracy = np.mean(svm_predictions == test_labels)

    print('The SVM model had an accuracy of {} on the testing set'.format(svm_accuracy, optimal_radius))


if __name__ == "__main__":
    main()

>> ... data loaded
>> ... word embedding done
p0: 0.8629122728292573, p1: 0.13708772717074266
>> ... fitting naive bayes model done
Naive Bayes (w/ laplace smoothing) had an accuracy of 0.8799283154121864 on the testing set
The top 5 indicative words for Naive Bayes are:  ['claim', 'won', 'prize', 'tone', 'urgent!']
best validation acc:0.9443447037701975 (w/ radius 0.1)
The optimal SVM radius was 0.1
The SVM model had an accuracy of 0.967741935483871 on the testing set
