In [2]:
import os
import sys

In [3]:
# To add your own Drive Run this cell.
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Please append your own directory after ‘/content/drive/My Drive/'
### ========== TODO : START ========== ###
sys.path += ['/content/drive/MyDrive/HW3/HW3-code']
### ========== TODO : END ========== ###

In [5]:
"""
Author      : Yi-Chieh Wu, Sriram Sankararman
Description : Twitter
"""

from string import punctuation

import numpy as np
import matplotlib.pyplot as plt
# !!! MAKE SURE TO USE LinearSVC.decision_function(X), NOT LinearSVC.predict(X) !!!
# (this makes ''continuous-valued'' predictions)
from sklearn.svm import LinearSVC
from sklearn.model_selection import StratifiedKFold
from sklearn import metrics

# Problem 3: Twitter Analysis Using SVM

In [6]:
######################################################################
# functions -- input/output
######################################################################

def read_vector_file(fname):
    """
    Reads and returns a vector from a file.

    Parameters
    --------------------
        fname  -- string, filename

    Returns
    --------------------
        labels -- numpy array of shape (n,)
                    n is the number of non-blank lines in the text file
    """
    return np.genfromtxt(fname)


def write_label_answer(vec, outfile):
    """
    Writes your label vector to the given file.

    Parameters
    --------------------
        vec     -- numpy array of shape (n,) or (n,1), predicted scores
        outfile -- string, output filename
    """

    # for this project, you should predict 70 labels
    if(vec.shape[0] != 70):
        print("Error - output vector should have 70 rows.")
        print("Aborting write.")
        return

    np.savetxt(outfile, vec)
    

In [9]:
######################################################################
# functions -- feature extraction
######################################################################

def extract_words(input_string):
    """
    Processes the input_string, separating it into "words" based on the presence
    of spaces, and separating punctuation marks into their own words.

    Parameters
    --------------------
        input_string -- string of characters

    Returns
    --------------------
        words        -- list of lowercase "words"
    """

    for c in punctuation :
        input_string = input_string.replace(c, ' ' + c + ' ')
    return input_string.lower().split()


def extract_dictionary(infile):
    """
    Given a filename, reads the text file and builds a dictionary of unique
    words/punctuations.

    Parameters
    --------------------
        infile    -- string, filename

    Returns
    --------------------
        word_list -- dictionary, (key, value) pairs are (word, index)
    """

    word_list = {}
    idx = 0
    with open(infile, 'r') as fid :
        # process each line to populate word_list
        for input_string in fid:
            words = extract_words(input_string)
            for word in words:
                if word not in word_list:
                    word_list[word] = idx
                    idx += 1
    return word_list


def extract_feature_vectors(infile, word_list):
    """
    Produces a bag-of-words representation of a text file specified by the
    filename infile based on the dictionary word_list.

    Parameters
    --------------------
        infile         -- string, filename
        word_list      -- dictionary, (key, value) pairs are (word, index)

    Returns
    --------------------
        feature_matrix -- numpy array of shape (n,d)
                          boolean (0,1) array indicating word presence in a string
                            n is the number of non-blank lines in the text file
                            d is the number of unique words in the text file
    """

    num_lines = sum(1 for line in open(infile,'r'))
    num_words = len(word_list)
    feature_matrix = np.zeros((num_lines, num_words))

    with open(infile, 'r') as fid :
        # process each line to populate feature_matrix
        for i, input_string in enumerate(fid):
            words = extract_words(input_string)
            for word in words:
                feature_matrix[i, word_list[word]] = 1.0

    return feature_matrix

In [14]:
######################################################################
# functions -- evaluation
######################################################################

def performance(y_true, y_pred, metric="accuracy"):
    """
    Calculates the performance metric based on the agreement between the
    true labels and the predicted labels.

    Parameters
    --------------------
        y_true -- numpy array of shape (n,), known labels
        y_pred -- numpy array of shape (n,), (continuous-valued) predictions
        metric -- string, option used to select the performance measure
                  options: 'accuracy', 'f1-score', 'auroc', 'precision',
                           'sensitivity', 'specificity'

    Returns
    --------------------
        score  -- float, performance score
    """
    # map continuous-valued predictions to binary labels
    y_label = np.sign(y_pred)
    y_label[y_label==0] = 1

    ### ========== TODO : START ========== ###
    # part 1a: compute classifier performance
    if metric == "accuracy":
      score = metrics.accuracy_score(y_true, y_label)
    elif metric == "f1-score":
      score = metrics.f1_score(y_true, y_label)
    elif metric == "auroc":
      score = metrics.roc_auc_score(y_true, y_pred)
    elif metric == "precision":
      score = metrics.precision_score(y_true, y_label)
    elif metric == "sensitivity":
      score = metrics.recall_score(y_true, y_label)
    elif metric == "specificity":
      tn, fp, fn, tp = metrics.confusion_matrix(y_true, y_label).ravel()
      score = tn / (tn + fp)
    else:
      raise ValueError("Enter a valid metric")

    return score
    ### ========== TODO : END ========== ###


def cv_performance(clf, X, y, kf, metric="accuracy"):
    """
    Splits the data, X and y, into k-folds and runs k-fold cross-validation.
    Trains classifier on k-1 folds and tests on the remaining fold.
    Calculates the k-fold cross-validation performance metric for classifier
    by averaging the performance across folds.

    Parameters
    --------------------
        clf    -- classifier (instance of LinearSVC)
        X      -- numpy array of shape (n,d), feature vectors
                    n = number of examples
                    d = number of features
        y      -- numpy array of shape (n,), binary labels {1,-1}
        kf     -- model_selection.StratifiedKFold
        metric -- string, option used to select performance measure

    Returns
    --------------------
        score   -- float, average cross-validation performance across k folds
    """

    ### ========== TODO : START ========== ###
    # part 1b: compute average cross-validation performance
    scores = []

    for train_index, test_index in kf.split(X, y):
      X_train, X_test = X[train_index], X[test_index]
      y_train, y_test = y[train_index], y[test_index]

      clf.fit(X_train, y_train)
      y_pred = clf.decision_function(X_test)

      score = performance(y_test, y_pred, metric=metric)
      scores.append(score)

    return np.mean(scores)
    ### ========== TODO : END ========== ###


def select_param_linear(X, y, kf, metric="accuracy"):
    """
    Sweeps different settings for the hyperparameter of a linear SVM,
    calculating the k-fold CV performance for each setting, then selecting the
    hyperparameter that 'maximize' the average k-fold CV performance.

    Parameters
    --------------------
        X      -- numpy array of shape (n,d), feature vectors
                    n = number of examples
                    d = number of features
        y      -- numpy array of shape (n,), binary labels {1,-1}
        kf     -- model_selection.StratifiedKFold
        metric -- string, option used to select performance measure

    Returns
    --------------------
        C -- float, optimal parameter value for linear SVM
    """

    print('Linear SVM Hyperparameter Selection based on ' + str(metric) + ':')
    C_range = 10.0 ** np.arange(-3, 3)

    ### ========== TODO : START ========== ###
    # part 1c: select optimal hyperparameter using cross-validation  
    best_C = None
    best_performance = -float('inf')

    for c in C_range:
      clf = LinearSVC(loss='hinge', random_state=0, C=c)
      avg_performance = cv_performance(clf, X, y, kf, metric=metric)

      if avg_performance > best_performance:
        best_performance = avg_performance
        best_C = c

    return best_C
    ### ========== TODO : END ========== ###


def performance_test(clf, X, y, metric="accuracy"):
    """
    Estimates the performance of the classifier.

    Parameters
    --------------------
        clf          -- classifier (instance of LinearSVC)
                          [already fit to data]
        X            -- numpy array of shape (n,d), feature vectors of test set
                          n = number of examples
                          d = number of features
        y            -- numpy array of shape (n,), binary labels {1,-1} of test set
        metric       -- string, option used to select performance measure

    Returns
    --------------------
        score        -- float, classifier performance
    """


    ### ========== TODO : START ========== ###
    # part 2b: return performance on test data under a metric.
    y_pred = clf.decision_function(X)
    score = performance(y, y_pred, metric=metric)
    return score

    ### ========== TODO : END ========== ###

In [17]:
######################################################################
# main
######################################################################

def main() :
    np.random.seed(1234)

    # read the tweets and its labels, change the following two lines to your own path.
    ### ========== TODO : START ========== ###
    file_path = '/content/drive/MyDrive/HW3/HW3-code/data/tweets.txt'
    label_path = '/content/drive/MyDrive/HW3/HW3-code/data/labels.txt'
    ### ========== TODO : END ========== ###
    dictionary = extract_dictionary(file_path)
    #print(len(dictionary))
    X = extract_feature_vectors(file_path, dictionary)
    y = read_vector_file(label_path)
    # split data into training (training + cross-validation) and testing set
    X_train, X_test = X[:560], X[560:]
    y_train, y_test = y[:560], y[560:]

    metric_list = ["accuracy", "f1-score", "auroc", "precision", "sensitivity", "specificity"]

    ### ========== TODO : START ========== ###
    # part 1b: create stratified folds (5-fold CV)
    kf = StratifiedKFold(n_splits=5)
    # part 1c: for each metric, select optimal hyperparameter for linear SVM using CV

    best_Cs = []
    for metric in metric_list:
      best_C = select_param_linear(X_train, y_train, kf, metric)
      print(best_C)
      best_Cs.append(best_C)
    # part 2a: train linear SVMs with selected hyperparameters
    clfs = []
    for best_C in best_Cs:
      clf = LinearSVC(loss='hinge', random_state=0, C=best_C)
      clf.fit(X_train, y_train)
      clfs.append(clf)
    # part 2b: test the performance of your classifiers.
    for metric, clf in zip(metric_list, clfs):
      score = performance_test(clf, X_test, y_test, metric)
      print(f"Performance on test data for {metric}: {score}")
    ### ========== TODO : END ========== ###


if __name__ == "__main__" :
    main()

Linear SVM Hyperparameter Selection based on accuracy:
1.0
Linear SVM Hyperparameter Selection based on f1-score:
1.0
Linear SVM Hyperparameter Selection based on auroc:
1.0
Linear SVM Hyperparameter Selection based on precision:
10.0
Linear SVM Hyperparameter Selection based on sensitivity:
0.001
Linear SVM Hyperparameter Selection based on specificity:
1.0
Performance on test data for accuracy: 0.7428571428571429
Performance on test data for f1-score: 0.47058823529411764
Performance on test data for auroc: 0.7424684159378038
Performance on test data for precision: 0.6363636363636364
Performance on test data for sensitivity: 1.0
Performance on test data for specificity: 0.8979591836734694


# Problem 4: Boosting vs. Decision Tree

In [19]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.model_selection import cross_val_score, train_test_split

In [20]:
class Data :
    
    def __init__(self) :
        """
        Data class.
        
        Attributes
        --------------------
            X -- numpy array of shape (n,d), features
            y -- numpy array of shape (n,), targets
        """
                
        # n = number of examples, d = dimensionality
        self.X = None
        self.y = None
        
        self.Xnames = None
        self.yname = None
    
    def load(self, filename, header=0, predict_col=-1) :
        """Load csv file into X array of features and y array of labels."""
        
        # determine filename
        f = filename
        
        # load data
        with open(f, 'r') as fid :
            data = np.loadtxt(fid, delimiter=",", skiprows=header)
        
        # separate features and labels
        if predict_col is None :
            self.X = data[:,:]
            self.y = None
        else :
            if data.ndim > 1 :
                self.X = np.delete(data, predict_col, axis=1)
                self.y = data[:,predict_col]
            else :
                self.X = None
                self.y = data[:]
        
        # load feature and label names
        if header != 0:
            with open(f, 'r') as fid :
                header = fid.readline().rstrip().split(",")
                
            if predict_col is None :
                self.Xnames = header[:]
                self.yname = None
            else :
                if len(header) > 1 :
                    self.Xnames = np.delete(header, predict_col)
                    self.yname = header[predict_col]
                else :
                    self.Xnames = None
                    self.yname = header[0]
        else:
            self.Xnames = None
            self.yname = None


# helper functions
def load_data(filename, header=0, predict_col=-1) :
    """Load csv file into Data class."""
    data = Data()
    data.load(filename, header=header, predict_col=predict_col)
    return data

In [21]:
# Change the path to your own data directory
### ========== TODO : START ========== ###
titanic = load_data("/content/drive/MyDrive/HW3/HW3-code/data/titanic_train.csv", header=1, predict_col=0)
### ========== TODO : END ========== ###
X = titanic.X; Xnames = titanic.Xnames
y = titanic.y; yname = titanic.yname
n,d = X.shape  # n = number of examples, d =  number of features

In [29]:
def error(clf, X, y, ntrials=100, test_size=0.2) :
    """
    Computes the classifier error over a random split of the data,
    averaged over ntrials runs.

    Parameters
    --------------------
        clf         -- classifier
        X           -- numpy array of shape (n,d), features values
        y           -- numpy array of shape (n,), target classes
        ntrials     -- integer, number of trials
        test_size   -- proportion of data used for evaluation

    Returns
    --------------------
        train_error -- float, training error
        test_error  -- float, test error
    """

    train_error = 0
    test_error = 0

    train_scores = []; 
    test_scores = [];
    for i in range(ntrials):
        xtrain, xtest, ytrain, ytest = train_test_split(X,y, test_size = test_size, random_state=i)
        clf.fit(xtrain, ytrain)

        ypred = clf.predict (xtrain)
        err = 1 - metrics.accuracy_score (ytrain, ypred, normalize = True)
        train_scores.append (err)

        ypred = clf.predict (xtest)
        err = 1 - metrics.accuracy_score (ytest, ypred, normalize = True)
        test_scores.append (err)

    train_error =  np.mean (train_scores)
    test_error = np.mean (test_scores)
    return train_error, test_error


In [30]:
### ========== TODO : START ========== ###
# Part 4(a): Implement the decision tree classifier and report the training error.
print('Classifying using Decision Tree...')

# Create a decision tree classifier
clf = DecisionTreeClassifier(criterion='entropy', random_state=0)

# Train the classifier on the whole dataset
clf.fit(X, y)


# Compute the training error
err = error(clf, X, y)

# Print the training error
print(f"Training Error: {err[0]}")
print(f"Test Error: {err[1]}")
### ========== TODO : END ========== ###

Classifying using Decision Tree...
Training Error: 0.011528998242530775
Test Error: 0.24104895104895108


In [31]:
### ========== TODO : START ========== ###
# Part 4(b): Implement the random forest classifier and adjust the number of samples used in bootstrap sampling.
print('Classifying using Random Forest...')
max_samples_values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]

# Initialize variables to track the best setting and corresponding error
best_max_samples = None
best_test_error = float('inf')
best_training_error = None

# Loop over the max_samples values
for max_samples in max_samples_values:
    #print(f"Max Samples: {max_samples}")

    # Create a random forest classifier
    clf = RandomForestClassifier(criterion='entropy', random_state=0, max_samples=max_samples)

    # Train the classifier on the whole dataset
    clf.fit(X, y)

    # Compute the training and test errors using the error function
    training_error, test_error = error(clf, X, y)

    # Print the training and test errors
    #print(f"Training Error: {training_error}")
    #print(f"Test Error: {test_error}")

    # Update the best setting and error if necessary
    if test_error < best_test_error:
        best_max_samples = max_samples
        best_test_error = test_error
        best_training_error = training_error

# Print the best setting and corresponding error
print(f"Best Max Samples: {best_max_samples}")
print(f"Test Error: {best_test_error}")
print(f"Training Error: {best_training_error}")
### ========== TODO : END ========== ###

Classifying using Random Forest...
Best Max Samples: 0.3
Test Error: 0.1874825174825175
Training Error: 0.09427065026362039


In [32]:
### ========== TODO : START ========== ###
# Part 4(c): Implement the random forest classifier and adjust the number of features for each decision tree.
print('Classifying using Random Forest...')
max_features_values = list(range(1, 8))

# Initialize variables to track the best setting and corresponding error
best_max_features = None
best_test_error = float('inf')
best_training_error = None

# Loop over the max_features values
for max_features in max_features_values:
    #print(f"Max Features: {max_features}")

    # Create a random forest classifier with the best max_samples value from Part b
    clf = RandomForestClassifier(criterion='entropy', random_state=0, max_samples=best_max_samples, max_features=max_features)

    # Train the classifier on the whole dataset
    clf.fit(X, y)

    # Compute the training and test errors using the error function
    training_error, test_error = error(clf, X, y)

    # Print the training and test errors
    #print(f"Training Error: {training_error}")
    #print(f"Test Error: {test_error}")

    # Update the best setting and error if necessary
    if test_error < best_test_error:
        best_max_features = max_features
        best_test_error = test_error
        best_training_error = training_error

# Print the best setting and corresponding error
print(f"Best Max Features: {best_max_features}")
print(f"Test Error: {best_test_error}")
print(f"Training Error: {best_training_error}")
### ========== TODO : END ========== ###

Classifying using Random Forest...
Best Max Features: 3
Test Error: 0.18678321678321677
Training Error: 0.09481546572934976


In [12]:
import numpy as np

x1 = [0, 1, 3, -2, -1, 10, 12, -7, -3, 5]
x2 = [5, 4, 7, 1, 13, 3, 7, -1, 12, 9]
y = [-1, -1, 1, 1, -1, -1, 1, -1, 1, 1]

def error(x, y, j, weights):
  err = 0
  for i in range(10):
    label = np.sign(x[i] - j)

    if label != y[i]:
      err += weights[i]
  
  return err
  
w1 = 10 * [0.1]
w2 = [0.0625, 0.0625, 0.0625, 0.25, 0.25, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625]

def find_optimal(data, w):
  min_error = float("inf")
  best_i = None

  for i in range(-100, 100):
    if i not in data:
      err = error(data, y, i, w)

      if err < min_error:
        min_error = err
        best_i = i
  
  return min_error, best_i

j1 = find_optimal(x1, w1)[1]
j2 = find_optimal(x2, w1)[1]
j1_p = find_optimal(x1, w2)[1]
j2_p = find_optimal(x2, w2)[1]

print(f"Optimal j1: {j1}")
print(f"Optimal j2: {j2}")
print(f"Optimal j1': {j1_p}")
print(f"Optimal j2: {j2_p}")


Optimal j1: 2
Optimal j2: 6
Optimal j1': 2
Optimal j2: 0
