# HW 5 - supplemental notebook
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Fall 2018`__

This notebook contains supplemental materials to help you run your HW5 solution to question 8 using Google Compute Platform. __Important Note:__ _the graders will not read this notebook. If you do use it, please be sure to copy any solutions back into the main homework notebook to receive credit for your results._ 

### Instructions
1. Create your GCP account & apply for credit through the w261 education grant. (see [create_account.md](https://github.com/UCB-w261/Instructors/blob/master/GCP/create_account.md))
2. Set up your project, bucket, service account, access key and virtual environment. (steps 1-15 in [setup.md](https://github.com/UCB-w261/Instructors/blob/master/GCP/setup.md))
3. (OPTIONAL) Review the GCP documentation to become more familiar with the setup steps you've just performed: [key terms & concepts described here](https://cloud.google.com/storage/docs/concepts) 
4. Run this notebook on your local machine using the virtual environment you created in step 2. (_Note that you may have to install jupyter in that environment in order to do so) _
5. Follow the example below to learn how you can write a spark script & submit it to be run on a cluster all from your Jupyter Notebook.
6. Turn your HW5 Q8 solution into a script that follows the example provided.
7. Copy all the code to write this^ script & run the job back into the HW notebook where it will be graded.

### Notebook Setup

In [1]:
# global variables
PROJECT_ID = 'w262-245821' # fill in your GCP project id
BUCKET_NAME = 'w261_sj_data' # fill in the name of your GCP bucket
CLUSTER_NAME = 'w261-sj' # choose a cluster name, this should include only a-z, 0-9 & start with a letter
HOME = '/Users/sid'

### Submission Script
The cell below will create a python script in the current working directory called `submit_job_to_cluster.py` -- this script will help you run your own spark jobs on the cluster. You can read more about it in the [w261-environment](https://github.com/UCB-w261/w261-environment/tree/master/gcp-files/dataproc) repo.

In [2]:
%%writefile submit_job_to_cluster.py
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Sample command-line program for listing Google Dataproc Clusters"""

import argparse
import os

from google.cloud import storage
import googleapiclient.discovery

DEFAULT_FILENAME = 'pyspark_sort.py'


def get_default_pyspark_file():
    """Gets the PySpark file from this directory"""
    current_dir = os.path.dirname(os.path.abspath(__file__))
    f = open(os.path.join(current_dir, DEFAULT_FILENAME), 'rb')
    return f, DEFAULT_FILENAME


def get_pyspark_file(filename):
    f = open(filename, 'rb')
    return f, os.path.basename(filename)


def upload_pyspark_file(project_id, bucket_name, filename, file):
    """Uploads the PySpark file in this directory to the configured
    input bucket."""
    print('Uploading pyspark file to GCS')
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(filename)
    blob.upload_from_file(file)


def download_output(project_id, cluster_id, output_bucket, job_id):
    """Downloads the output file from Cloud Storage and returns it as a
    string."""
    print('Downloading output file')
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(output_bucket)
    output_blob = (
        'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
        .format(cluster_id, job_id))
    return bucket.blob(output_blob).download_as_string()


# [START create_cluster]
def create_cluster(dataproc, project, zone, region, cluster_name,
                   instance_type, master_nodes, worker_nodes):
    print('Creating cluster...')
    zone_uri = \
        'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
            project, zone)
    cluster_data = {
        'projectId': project,
        'clusterName': cluster_name,
        'config': {
            'gceClusterConfig': {
                'zoneUri': zone_uri,
                "metadata": {
                    "CONDA_PACKAGES": "\"numpy pandas\"",
                    "MINICONDA_VARIANT": "2"
                }
            },
            "softwareConfig": {
                'properties': {
                    'spark:spark.jars.packages': 'com.databricks:spark-xml_2.11:0.4.1,graphframes:graphframes:0.5.0-spark2.1-s_2.11,com.databricks:spark-avro_2.11:4.0.0'
                }
            },
            'masterConfig': {
                'numInstances': master_nodes,
                'machineTypeUri': instance_type
            },
            'workerConfig': {
                'numInstances': worker_nodes,
                'machineTypeUri': instance_type
            },
            'secondaryWorkerConfig': {
                'numInstances': "2",
                'machineTypeUri': instance_type,
                "isPreemptible": "True"
            },
            "initializationActions": [
                {
                    "executableFile": "gs://dataproc-initialization-actions/conda/bootstrap-conda.sh"
                },
                {
                    "executableFile": "gs://dataproc-initialization-actions/conda/install-conda-env.sh"
                }
            ]
        }
    }
    result = dataproc.projects().regions().clusters().create(
        projectId=project,
        region=region,
        body=cluster_data).execute()
    return result
# [END create_cluster]


def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
    print('Waiting for cluster creation...')

    while True:
        result = dataproc.projects().regions().clusters().list(
            projectId=project_id,
            region=region).execute()
        cluster_list = result['clusters']
        cluster = [c
                   for c in cluster_list
                   if c['clusterName'] == cluster_name][0]
        if cluster['status']['state'] == 'ERROR':
            raise Exception(result['status']['details'])
        if cluster['status']['state'] == 'RUNNING':
            print("Cluster created.")
            break


# [START list_clusters_with_detail]
def list_clusters_with_details(dataproc, project, region):
    result = dataproc.projects().regions().clusters().list(
        projectId=project,
        region=region).execute()
    cluster_list = result['clusters']
    for cluster in cluster_list:
        print("{} - {}"
              .format(cluster['clusterName'], cluster['status']['state']))
    return result
# [END list_clusters_with_detail]


def get_cluster_id_by_name(cluster_list, cluster_name):
    """Helper function to retrieve the ID and output bucket of a cluster by
    name."""
    cluster = [c for c in cluster_list if c['clusterName'] == cluster_name][0]
    return cluster['clusterUuid'], cluster['config']['configBucket']


# [START submit_pyspark_job]
def submit_pyspark_job(dataproc, project, region,
                       cluster_name, bucket_name, filename):
    """Submits the Pyspark job to the cluster, assuming `filename` has
    already been uploaded to `bucket_name`"""
    job_details = {
        'projectId': project,
        'job': {
            'placement': {
                'clusterName': cluster_name
            },
            'pysparkJob': {
                'mainPythonFileUri': 'gs://{}/{}'.format(bucket_name, filename)
            }
        }
    }
    result = dataproc.projects().regions().jobs().submit(
        projectId=project,
        region=region,
        body=job_details).execute()
    job_id = result['reference']['jobId']
    print('Submitted job ID {}'.format(job_id))
    return job_id
# [END submit_pyspark_job]


# [START delete]
def delete_cluster(dataproc, project, region, cluster):
    print('Tearing down cluster')
    result = dataproc.projects().regions().clusters().delete(
        projectId=project,
        region=region,
        clusterName=cluster).execute()
    return result
# [END delete]


# [START wait]
def wait_for_job(dataproc, project, region, job_id):
    print('Waiting for job to finish...')
    while True:
        result = dataproc.projects().regions().jobs().get(
            projectId=project,
            region=region,
            jobId=job_id).execute()
        # Handle exceptions
        if result['status']['state'] == 'ERROR':
            raise Exception(result['status']['details'])
        elif result['status']['state'] == 'DONE':
            print('Job finished.')
            return result
# [END wait]


# [START get_client]
def get_client():
    """Builds an http client authenticated with the service account
    credentials."""
    dataproc = googleapiclient.discovery.build('dataproc', 'v1')
    return dataproc
# [END get_client]


def main(project_id, zone, cluster_name, bucket_name,
         instance_type, master_nodes, worker_nodes,
         pyspark_file=None, create_new_cluster=True):
    dataproc = get_client()
    region = 'global'
    try:
        if pyspark_file:
            spark_file, spark_filename = get_pyspark_file(pyspark_file)
        else:
            spark_file, spark_filename = get_default_pyspark_file()

        if create_new_cluster:
            create_cluster(
                dataproc, project_id, zone, region, cluster_name,
                instance_type, master_nodes, worker_nodes)
            wait_for_cluster_creation(
                dataproc, project_id, region, cluster_name)

        upload_pyspark_file(
            project_id, bucket_name, spark_filename, spark_file)

        cluster_list = list_clusters_with_details(
            dataproc, project_id, region)['clusters']

        (cluster_id, output_bucket) = (
            get_cluster_id_by_name(cluster_list, cluster_name))

        # [START call_submit_pyspark_job]
        job_id = submit_pyspark_job(
            dataproc, project_id, region,
            cluster_name, bucket_name, spark_filename)
        # [END call_submit_pyspark_job]
        wait_for_job(dataproc, project_id, region, job_id)

        output = download_output(project_id, cluster_id, output_bucket, job_id)
        print('Received job output {}'.format(output))
        return output
    finally:
        if create_new_cluster:
            delete_cluster(dataproc, project_id, region, cluster_name)
        spark_file.close()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
            '--project_id', 
            help='Project ID you want to access.',
            required=True
        ),
    parser.add_argument(
            '--zone',
            help='Zone to create clusters in/connect to.',
            required=True
        ),
    parser.add_argument(
            '--cluster_name',
            help='Name of the cluster to create/connect to',
            required=True
        )
    parser.add_argument(
            '--gcs_bucket',
            help='Bucket to upload Pyspark file to',
            required=True
        )
    parser.add_argument(
            '--pyspark_file',
            help='Pyspark filename. Defaults to pyspark_sort.py'
        )
    parser.add_argument(
            '--create_new_cluster',
            action='store_true',
            help='States if the cluster should be created'
        )
    parser.add_argument(
            '--key_file',
            help='Location of your key file for service account'
        )
    parser.add_argument(
            '--instance_type',
            help='Instance types used for this cluster',
            default='n1-standard-8'
        )
    parser.add_argument(
            '--master_nodes',
            help='Number of master nodes',
            default=1
        )
    parser.add_argument(
            '--worker_nodes',
            help='Number of worker nodes',
            default=2
        )

    args = parser.parse_args()

    if args.key_file is not None:
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.key_file

    main(
        args.project_id, args.zone, args.cluster_name,
        args.gcs_bucket, args.instance_type, args.master_nodes, args.worker_nodes,
        args.pyspark_file, args.create_new_cluster)

Writing submit_job_to_cluster.py


In [3]:
!chmod a+x submit_job_to_cluster.py

### Example: h.t. create and run a spark job on a cluster using GCP
Run the cell below to create a file called `pyspark_sort.py` in the current directory. Then run the bash cell to submit this job to GCP & spin up a cluster. (__`Note:`__ _make sure you have all the global variables set up first including the name of the spark job if you change it._)

In [12]:
%%writefile FM.py
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Sample pyspark script to be uploaded to Cloud Storage and run on
Cloud Dataproc.

Note this file is not intended to be run directly, but run inside a PySpark
environment.
"""

# [START pyspark]
# imports
import re
import numpy as np
import pandas as pd
from os import path
import time


from pyspark.sql import SparkSession

# create Spark Session
from pyspark.sql import SparkSession
app_name = "final_project"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

df_k = spark.read.parquet("train_features_dense")
testRDD = df_k.rdd.map(lambda x: (x[0], tuple(x[1])))

def fmLoss(dataRDD, w, w1,w0) :
    """
    Computes the logloss given the data and model W
    dataRDD - array of features, label
    """
    w_bc = sc.broadcast(w)
    w1_bc = sc.broadcast(w1)
    w0_bc = sc.broadcast(w0)
    def probability_value(x,W,W1,W0): 
        xa = np.array([x])
        V =  xa.dot(W)
        V_square = (xa*xa).dot(W*W)
        phi = 0.5*(V*V - V_square).sum() + xa.dot(W1.T) + W0
        return 1.0/(1.0 + np.exp(-phi))
    
    loss = dataRDD.map(lambda x: (x[0],x[1]) if x[0] == 1 else (-1, x[1])).map(lambda x:  (probability_value(x[1],w_bc.value, w1_bc.value, w0_bc.value), x[0])) \
        .map(lambda x: (1 - 1e-12, x[1]) if x[0] == 1 else ((1e-12, x[1]) if x[0] == 0  else (x[0],x[1]))) \
        .map(lambda x: -(x[1] * np.log(x[0]) + (1-x[1])*np.log(1-x[0]))).mean()
    
    
    return float(loss)

def fmGradUpdate_v1(dataRDD, w, w1, w0, alpha, regParam, regParam1, regParam0):
    """
    Computes the gradient and updates the model
    """
    
    w_bc = sc.broadcast(w)
    w1_bc = sc.broadcast(w1)
    w0_bc = sc.broadcast(w0)
    rp_bc = sc.broadcast(regParam)
    rp1_bc = sc.broadcast(regParam1)
    rp0_bc = sc.broadcast(regParam0)
    
    #Gradient for interaction term
    
    def row_grad(x, y, W, W1, W0, regParam, regParam1, regParam0):
        xa = np.array([x])
        VX =  xa.dot(W)
        VX_square = (xa*xa).dot(W*W)
        phi = 0.5*(VX*VX - VX_square).sum() + xa.dot(W1.T) + W0
        expnyt = np.exp(y*phi) 
        grad_loss = (-y/(1+expnyt))*(xa.T.dot(xa).dot(W) - np.diag(np.square(x)).dot(W))
        return 2*regParam*W + grad_loss
    
    #Gradient for Linear term
    def row_grad1(x, y, W, W1, W0, regParam, regParam1, regParam0):
        xa = np.array([x])
        VX =  xa.dot(W)
        VX_square = (xa*xa).dot(W*W)
        phi = 0.5*(VX*VX - VX_square).sum() + xa.dot(W1.T) + W0
        expnyt = np.exp(y*phi)
        grad_loss1 = (-y/(1+expnyt))*xa
        return 2*regParam1*W1 + grad_loss1
    
    #Gradient for bias term
    def row_grad0(x, y, W, W1, W0, regParam, regParam1, regParam0):
        xa = np.array([x])
        VX =  xa.dot(W)
        VX_square = (xa*xa).dot(W*W)
        phi = 0.5*(VX*VX - VX_square).sum() + xa.dot(W1.T) + W0
        expnyt = np.exp(y*phi)
        grad_loss0 = (-y/(1+expnyt))*1
        return 2*regParam0*W0 +grad_loss0
    
   
    
    batchRDD = dataRDD.sample(False, 0.00001, 2019)  
    grad = batchRDD.map(lambda x: (x[0],x[1]) if x[0] == 1 else (-1, x[1])).map(lambda x: (1, row_grad(x[1], x[0], w_bc.value, w1_bc.value, w0_bc.value, rp_bc.value,rp1_bc.value,rp0_bc.value))).reduceByKey(lambda x,y: np.add(x,y))
    model = w - alpha * grad.values().collect()[0] 
    
    grad1 = batchRDD.map(lambda x: (x[0],x[1]) if x[0] == 1 else (-1, x[1])).map(lambda x: (1, row_grad1(x[1], x[0], w_bc.value, w1_bc.value, w0_bc.value, rp_bc.value,rp1_bc.value,rp0_bc.value))).reduceByKey(lambda x,y: np.add(x,y))
    model1 = w1 - alpha * grad1.values().collect()[0]
    
    grad0 = batchRDD.map(lambda x: (x[0],x[1]) if x[0] == 1 else (-1, x[1])).map(lambda x: (1, row_grad0(x[1], x[0], w_bc.value, w1_bc.value, w0_bc.value, rp_bc.value,rp1_bc.value,rp0_bc.value))).reduceByKey(lambda x,y: np.add(x,y))
    model0 = w0 - alpha * grad0.values().collect()[0]
    
    return model, model1 ,model0
    
def GradientDescent(trainRDD, testRDD, model, model1, model0, nSteps = 20, 
                    learningRate = 0.01, regParam = 0.01,regParam1 = 0.01,regParam0 = 0.01, verbose = False):
    """
    Perform nSteps iterations of OLS gradient descent and 
    track loss on a test and train set. Return lists of
    test/train loss and the models themselves.
    """
    # initialize lists to track model performance
    train_history, test_history, model_history, model1_history, model0_history = [], [], [], [], []
    
    # perform n updates & compute test and train loss after each
    model = wInit
    model1 = wInit1
    model0 = wInit0
    for idx in range(nSteps): 
        
        ############## YOUR CODE HERE #############
        
        model, model1, model0 = fmGradUpdate_v1(trainRDD, model, model1, model0, learningRate, regParam, regParam1, regParam0)
        if idx %10 == 0:
            training_loss = fmLoss(trainRDD, model, model1, model0) 
            test_loss = fmLoss(testRDD, model, model1, model0) 
        ############## (END) YOUR CODE #############
        # keep track of test/train loss for plotting
        if idx % 10 == 0:
            train_history.append(training_loss)
            test_history.append(test_loss)
        model_history.append(model)
        model1_history.append(model1)
        model0_history.append(model0)
        
        # console output if desired
        if idx % 10 == 0:
            if verbose:
                print("----------")
                print(f"STEP: {idx+1}")
                print(f"training loss: {training_loss}")
                print(f"test loss: {test_loss}")
            #print(f"Model: {[k for k in model]}")
   
    return train_history, test_history, model_history, model1_history, model0_history

def wInitialization(dataRDD, factor):
    nrFeat = len(dataRDD.first()[1])
    np.random.seed(int(time.time())) 
    w =  np.random.ranf((nrFeat, factor))
    w = w / np.sqrt((w*w).sum())
    
    w1 =  np.random.ranf(nrFeat)
    w1 = w1 / np.sqrt((w1*w1).sum())
    
    w0 =  np.random.ranf(1)
    
    return w, w1, w0

train, validation, test = testRDD.randomSplit([0.6, 0.2, 0.2])

start = time.time()
wInit, wInit1, wInit0 = wInitialization(train, 2)
logerr_train, logerr_test, models, model1s, model0s = GradientDescent(train, validation, wInit, wInit1, wInit0, nSteps = 80,
                                                    learningRate = 0.0003, regParam = 0.01, regParam1 = 0.01, regParam0 = 0.01, verbose = True)

print(f"\n... trained {len(models)} iterations in {time.time() - start} seconds")

def fmMakePrediction(dataRDD, w, w1, w0, threshold):
    """
    Perform one regularized gradient descent step/update.
    Args:
        dataRDD - records are tuples of (y, features_array)
        W       - (array) model coefficients with bias at index 0
    Returns:
        pred - (rdd) predicted targets
    """
    w_bc = sc.broadcast(w)
    w1_bc = sc.broadcast(w1)
    w0_bc = sc.broadcast(w0)
    def predict_fm(x, W, W1, W0):
        xa = np.array([x])
        VX =  xa.dot(W)
        VX_square = (xa*xa).dot(W*W)
        phi = 0.5*(VX*VX - VX_square).sum() + xa.dot(W1.T) + W0
        return 1.0/(1.0 + np.exp(-phi))
    
    # compute prediction
    pred = dataRDD.map(lambda x: (int(predict_fm(x[1],w_bc.value, w1_bc.value, w0_bc.value)>threshold), x[0] ))
    ntp = pred.map(lambda x: int((x[0]*x[1]) == 1)).sum()
    ntn = pred.map(lambda x: int((x[0]+x[1]) == 0)).sum()
    nfp = pred.map(lambda x: int((x[0] == 1) * (x[1] == 0))).sum()
    nfn = pred.map(lambda x: int((x[0] == 0) * (x[1] == 1))).sum()
   
    return pred, ntp, ntn, nfp, nfn
   
    return pred

pred, ntp, ntn, nfp, nfn = fmMakePrediction(validation, models[-1], model1s[-1], model0s[-1],0.65)

acc = (ntp+ntn)/(ntp+ntn+nfp+nfn)
prec = (ntp)/(ntp+nfp)
rec = (ntp)/(ntp+nfn)
f1 = 2*prec*rec/(prec+rec)
fpr = nfp/(ntn+nfp)
tpr = ntp/(ntp+nfn)
print('Accuracy is: ', acc)
print('Precision is: ', prec)
print('Recall is: ', rec)
print('F1 score is: ', f1)
print('False positive rate is: ', fpr)
print('True positive rate is: ', tpr)

sc.parallelize(models).saveAsTextFile("gs://w261_sj_data/data/models")
sc.parallelize(model1s).saveAsTextFile("gs://w261_sj_data/data/model1s")
sc.parallelize(model0s).saveAsTextFile("gs://w261_sj_data/data/model0s")

# [End pyspark]

Overwriting LR.py


In [None]:
!python3 submit_job_to_cluster.py \
    --project_id='w262-245821' \
    --zone=us-central1-b \
    --cluster_name='w261-sj' \
    --gcs_bucket='w261_sj_data' \
    --key_file=$HOME/w261.json \
    --create_new_cluster \
    --pyspark_file=FM.py

Creating cluster...
Waiting for cluster creation...
Cluster created.
Uploading pyspark file to GCS
w261-sj - RUNNING
Submitted job ID 805c03f5-13b6-46c0-ac0a-e513f4fa719c
Waiting for job to finish...


In [6]:
# global variables
PROJECT_ID = 'w262-245821' # fill in your GCP project id
BUCKET_NAME = 'w261_sj_data' # fill in the name of your GCP bucket
CLUSTER_NAME = 'w261-sj' # choose a cluster name, this should include only a-z, 0-9 & start with a letter
HOME = '/Users/sid'