

```

```

# <b>W261 Team Project</b>
- Youzhi Chloe Wu, Curtis Lin, Eddie Zhu, Kai Qi Lim

# Install packages and import models

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 13kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 50.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=c6eac382077e54f046637f112d748911967d85014e08e202f83d8f8ecaccbf22
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


## install packages

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark-dist-explore
!pip install -U -q PyDrive

Collecting pyspark-dist-explore
  Downloading https://files.pythonhosted.org/packages/3c/33/2b6c29265413f2b56516caf02b8befbb6a79a1a3516d57bf1b0742a1be40/pyspark_dist_explore-0.1.8-py3-none-any.whl
Installing collected packages: pyspark-dist-explore
Successfully installed pyspark-dist-explore-0.1.8


## import packages

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
from random import sample
import seaborn as sns
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from pyspark.sql.functions import isnan, when, count, col
from pyspark_dist_explore import Histogram, hist, distplot, pandas_histogram
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType


In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "hw5_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [7]:
spark

In [0]:
import pyspark
# package for loading file in Apache Parquet Format
import pyarrow.parquet as pq

In [0]:
sc = spark.sparkContext
# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## map google drive for data import

In [10]:
# Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.



## Import & Export Data

mini_toy has five fields:
* target variable
* numeric variable
* numeric variable
* categorical
* categorical

In [11]:
%%writefile mini_toy.txt
1 0.9 4 blue this
0 0.7 3 red that
0 0.4 1 red this
1 1.2 5 red that
1 1.0 3 blue this

Writing mini_toy.txt


In [63]:
%%writefile mini_dev.txt
1 0.6 2 red that
0 0.8 4 red this
0 1.7 3 blue this

Overwriting mini_dev.txt


In [0]:
# load data 
data = sc.textFile("mini_toy.txt")  
dev_data = sc.textFile("mini_dev.txt")  

In [13]:
# helper function to (1) split fields and target
# log-transform the numeric fields
num_col = [1,2]
str_col = [3,4]

def parse(line):
    """
    Map str row --> (tuple,of,fields) and log transform numeric fields
    """
    fields = np.array(line.split(" "))
    target = fields[0]
    # initialise all_features with specified numerical fields converted to float
    all_features = [np.log(np.float(x)) for x in fields[num_col]]
    # add on categorical fields to maintain the same structure as original data
    for x in str_col:
        all_features.append(fields[x])
    
    return (all_features, target)

data.map(parse).collect()

[([-0.10536051565782628, 1.3862943611198906, 'blue', 'this'], '1'),
 ([-0.35667494393873245, 1.0986122886681098, 'red', 'that'], '0'),
 ([-0.916290731874155, 0.0, 'red', 'this'], '0'),
 ([0.1823215567939546, 1.6094379124341003, 'red', 'that'], '1'),
 ([0.0, 1.0986122886681098, 'blue', 'this'], '1')]

In [65]:
dev_data.map(parse).collect()

[([-0.5108256237659907, 0.6931471805599453, 'red', 'that'], '1'),
 ([-0.2231435513142097, 1.3862943611198906, 'red', 'this'], '0'),
 ([0.5306282510621704, 1.0986122886681098, 'blue', 'this'], '0')]

In [14]:
# !!! TO-DO: function to collect all unique categories, output length into a dict for specified columns
# assume dict is present with column index as keys, and categories as values for now
cat_dict = {}
cat_dict['2']=('red','blue')
cat_dict['3']=('this','that')
cat_dict

{'2': ('red', 'blue'), '3': ('this', 'that')}

In [15]:
# helper function for one-hot encoding
def onehot(line):
  """
  one-hot encode the categorical fields 
  """
  all_features, target = line
  # retrieve categories
  for col in str_col:
    cat = cat_dict[str(col-1)]
    
    for i in range(len(cat)):
        if all_features[col-1] == cat[i]:
          enc = 1;
        else:
          enc = 0;

        all_features.append(enc)
  # remove str columns
  del all_features[2:4]

  return (np.array(all_features), int(target))

data.map(parse) \
    .map(onehot) \
    .collect()

[(array([-0.10536052,  1.38629436,  0.        ,  1.        ,  1.        ,
          0.        ]), 1),
 (array([-0.35667494,  1.09861229,  1.        ,  0.        ,  0.        ,
          1.        ]), 0),
 (array([-0.91629073,  0.        ,  1.        ,  0.        ,  1.        ,
          0.        ]), 0),
 (array([0.18232156, 1.60943791, 1.        , 0.        , 0.        ,
         1.        ]), 1),
 (array([0.        , 1.09861229, 0.        , 1.        , 1.        ,
         0.        ]), 1)]

In [66]:
dev_data.map(parse) \
    .map(onehot) \
    .collect()

[(array([-0.51082562,  0.69314718,  1.        ,  0.        ,  0.        ,
          1.        ]), 1),
 (array([-0.22314355,  1.38629436,  1.        ,  0.        ,  1.        ,
          0.        ]), 0),
 (array([0.53062825, 1.09861229, 0.        , 1.        , 1.        ,
         0.        ]), 0)]

In [0]:
# log-transform and one-hot encode minitoy
miniRDDCached = data.map(parse).map(onehot).cache()
miniDevRDD = dev_data.map(parse).map(onehot).cache()

# 4. Algorithm Implementation
develop a ‘homegrown’ implementation of the algorithm, apply it to the training dataset and evaluate your results on the test set.

In [0]:
def normalize(dataRDD):
    """
    Helper function:
    Scale and center data around the mean of each feature.
    dataRDD - each record is a tuple of (features_array, y)
    """
    featureMeans = dataRDD.map(lambda x: x[0]).mean()
    featureStdev = np.sqrt(dataRDD.map(lambda x: x[0]).variance())
    normedRDD = dataRDD.map(lambda x: ((x[0] - featureMeans)/featureStdev, x[1]))
    
    # return featureMeans
    return normedRDD

In [0]:
def AugmentRDD(dataRDD, W):
    """
    Helper function: 
    Augment the dataRDD by adding bias feature of 1 at index 0
    Plug in weights (model, W) to parameter function
    Args:
        dataRDD - each record is a tuple of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    """
    # add a bias 'feature' of 1 at index 0
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))\
                           .map(lambda x: ((1 / (1 + np.exp(-(W.dot(x[0])))), x[0]), x[1]))\
                           .cache()
    
    return augmentedData

In [0]:
def LRLoss(augmentedData, W):
    """
    Helper function:
    Compute loss for logistic regression.
    Args:
        augmentedData - each record is a tuple of ((parameter function values, features_array with bias), y) 
        W             - (array) model coefficients with bias at index 0
    """
    # calculate loss based on formula
    loss = augmentedData.map(lambda x: -(x[1]*x[0][0]+(1-x[1])*x[0][0]))\
                        .mean()

    return loss

In [0]:
def GDUpdate(augmentedData, W, learningRate = 0.1):
    """
    Helper function: 
    Perform one Logistic Regression gradient descent step/update.
    Args:
        augmentedData - records are tuples of ((parameter function values, features_array with bias), y) 
        W       - (array) model coefficients with bias at index 0
    Returns:
        new_model - (array) updated coefficients, bias at index 0
    """
    # calculate gradient
    grad = augmentedData.map(lambda x: (x[0][0] - x[1])*x[0][1])\
                        .sum()
    
    # update model weights by gradient
    new_model = W - learningRate * grad
    
    return new_model

In [0]:
normedRDD = normalize(miniRDDCached).cache()
normedDev = normalize(miniDevRDD).cache()

In [52]:
# mean and variance of the target variable 
meanTarget = miniRDDCached.map(lambda x: x[1]).mean()
varTarget = miniRDDCached.map(lambda x: x[1]).variance()
print(f"Mean: {meanTarget}")
print(f"Variance: {varTarget}")

Mean: 0.6
Variance: 0.24000000000000005


In [0]:
# Structure: meanTarget, features of one-hot encoded array set to 0
BASELINE = np.array([meanTarget, 0, 0, 0, 0, 0, 0])

In [0]:
# augmentedMiniRDD = AugmentRDD(normedRDD, BASELINE)

In [0]:
# augmentedMiniRDD.collect()

In [0]:
# %%time
# # a few GD steps w/ normalized data  (RUN THIS CELL AS IS)
# nSteps = 5
# model = BASELINE
# print(f"BASELINE:  Loss = {LRLoss(miniRDDCached, model)}")
# for idx in range(nSteps):
#     print("----------")
#     print(f"STEP: {idx+1}")
#     model = GDUpdate(augmentedMiniRDD, model)
#     loss = LRLoss(augmentedMiniRDD, model) 
#     print(f"Loss: {loss}")
#     print(f"Model: {[round(w,3) for w in model]}")

In [0]:
def LRGDFit(normed_trainRDD, wInit, nSteps=20, learningRate=0.1, verbose=False):
    """
    Perform nSteps iterations of Logistic Regression gradient descent 
    Track loss on train set
    Return current model, its corresponding train loss. 
    """
    # initialize lists to track model performance
    train_history, model_history = [], []
    
    # perform n updates & compute test and train loss after each
    model = wInit
    
    for idx in range(nSteps): 
        
        augmentedData = AugmentRDD(normed_trainRDD, model)
        model = GDUpdate(augmentedData, model, learningRate)
        training_loss = LRLoss(augmentedData, model) 
        
        # keep track of train loss and models
        train_history.append(training_loss)
        model_history.append(model)
        
        # console output if desired
        if verbose:
            print("----------")
            print(f"STEP: {idx+1}")
            print(f"training loss: {training_loss}")
            print(f"Model: {[round(w,3) for w in model]}")
    return model, training_loss

In [0]:
def LRGDPredict(normed_dataRDD, W):
    """
    Perform prediction based on Logistic Regression weights.
    Args:
        dataRDD - RDD with test records that are tuples of (features_array, y)
        W       - (array) final model coefficients with bias at index 0
    Returns:
        preds - (array) predicted labels for each test record ((predicted labels, features_array), true label)
    """
    # run AugmentRDD function on dataRDD
    augmentedData = AugmentRDD(normed_dataRDD, W)
    
    # calculate parameterized function based on final weights (W) and then make predictions
    preds = augmentedData.map(lambda x: ((1 if x[0][0]>0.5 else 0), x[1]))\
                         .cache()
    
    # calculate loss based on final weights (W)
    test_loss = LRLoss(augmentedData, W)
    
    return preds, test_loss

In [69]:
# Experiment with different nSteps = 5, 10, 15
model, train_loss = LRGDFit(normedRDD, BASELINE, 10, 0.1, verbose=True)
# The loss stays fixed at around iteration 7 or 8.  

----------
STEP: 1
training loss: -0.6456563062257954
Model: [0.577, 0.209, 0.177, -0.163, 0.163, 0.041, -0.041]
----------
STEP: 2
training loss: -0.6305243765353823
Model: [0.562, 0.361, 0.302, -0.269, 0.269, 0.068, -0.068]
----------
STEP: 3
training loss: -0.6147221967091425
Model: [0.555, 0.478, 0.395, -0.343, 0.343, 0.088, -0.088]
----------
STEP: 4
training loss: -0.6048158663837551
Model: [0.552, 0.574, 0.47, -0.397, 0.397, 0.105, -0.105]
----------
STEP: 5
training loss: -0.599001397122365
Model: [0.553, 0.656, 0.532, -0.439, 0.439, 0.118, -0.118]
----------
STEP: 6
training loss: -0.5956368004396646
Model: [0.555, 0.728, 0.585, -0.474, 0.474, 0.13, -0.13]
----------
STEP: 7
training loss: -0.5937423025200221
Model: [0.558, 0.793, 0.633, -0.502, 0.502, 0.14, -0.14]
----------
STEP: 8
training loss: -0.5927449529387884
Model: [0.562, 0.853, 0.676, -0.526, 0.526, 0.149, -0.149]
----------
STEP: 9
training loss: -0.5923015305145359
Model: [0.565, 0.908, 0.716, -0.547, 0.547, 0.15

In [0]:
preds, test_loss = LRGDPredict(normedDev, model)

In [75]:
preds.collect()

[(0, 1), (1, 0), (1, 0)]