In [1]:
import pyspark

import re
import ast
import time
import numpy as np
#import pandas as pd


sc = pyspark.SparkContext()


# Click Through Rate (CTR) Prediction for Advertisement Pricing

## Goal of the Analysis

The initial goal of our analysis is to build a model to predict ClickThroughRate (CTR), i.e. given a set of features of both an advertisement(advertisment length, type, duration, etc) and a set of features of a user (webpage visited, web-page history), what is the probability that he or she will click on that given advertisement? The larger goal is to build a model that will perform at scale using concepts taught in this class.

## What Questions Do We Seek to Answer?

Some of the questions that we seek to answer are:

(1) Which variables are more important in determining the click-through rate prediction?

(2) We will be exploring the use of Logistic Regression for Click Through Rate prediction, but we will also discuss improvements and expected accuracy increase with using other Machine Learning techniques.

(3) More importantly we will be discussing how the eventual predictive modelling approach is expected to scale to the entire 10GB dataset. 

## Why do people perform this kind of analysis?

Click-through rate (CTR) prediction is critical to many web applications including web search, recommender systems, sponsored search, and display advertising. Search advertising, known as sponsored search, refers to advertisers identifying relevant keywords based on their product or service for advertising. When the user retrieves the keyword purchased by the advertiser, the corresponding advertisement is triggered and displayed. In the cost-per-click model, the advertiser pays the web publisher only when a user clicks their advertisements and visits the advertiser's site. The CTR prediction is defined to estimate the ratio of clicks to impressions of advertisements that will be displayed.

## What level of performance should the model be to achieve practical use?

Based on a literature search, the model would have to have an `areaUnderROC` value of close to 75% and a `logloss` of 0.025 for it to be considered a high-performing model (Wang et al., 2018). Entropy or mutual information is another metric that is useful for CTR prediction algorithms. Juan et al., 2017 used a `normalized logloss` to measure performance of their model as well as a `Utility` metric which allows to model offline the potential change in profit due to a prediction model change.

In [2]:
# imports
import re
import ast
import time
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt

In [3]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "hw5_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [4]:
from pyspark.sql import SQLContext

In [3]:
trainRDD = sc.textFile('s3://breastcancerfile/e-AKVLH55FZCUE4SN1JHTY7DWU4/train.txt')

VBox()

In [5]:
trainRDD = sc.textFile('data/train.txt')

## Section 1.0: Checking the Distribution of the Labels in the Dataset

The code below is used for finding the count of "1"s and "0"s in the entire dataset.
The count shows that we have 34,095,179 "0"s (no-clicks) and 11,745,438 "1"s (clicks). The dataset is unbalanced, but not by a lot, so it is not necessary that we do subsampling of the dataset.

In [9]:
def CountNumberLabel(toyRDDLine):
    
    """ Takes the Count of Number of 1 and 0's in the Label for the entire dataset """
    Values = toyRDDLine.split('\t')
    T1 = Values[0]
    return (T1, 1)

trainRDD.map(CountNumberLabel).reduceByKey(lambda x,y: x+y).collect()

[('0', 34095179), ('1', 11745438)]

## Section 1.1: Creating a Training and Testing Dataset from the Original 10GB DataSet

The training dataset was created from the original dataset using the `randomSplit()` function. `randomSplit()` enables us to generate a random subset of any dataset. We decided to go with an 1% split of the dataset that represents a count of 458,906 rows called `TrainRDD` - enough for one to train a Logistic Regression function. The testing dataset was created from the 99% remainder of the split (called `TrainRDD2`). The test dataset was used for validating the weights from the Logistic Regression Model used as a model for click prediction.

In [7]:
TrainRDD, TrainRDD2 = trainRDD.randomSplit([0.01,0.99], seed = 2018)

Prior to doing exploratory data analysis, we needed to know what the distribution of the number of clicks (1) and no-click (0) are in the entire dataset. This enabled us to understand if we have an unbalanced dataset and if subsampling of the dataset would be necessary to ensure that we have an even distribution between the features. 

In [8]:
TestRDD2, TestRDD = TrainRDD2.randomSplit([0.001,0.999], seed = 2018)

The count of the number of rows of data in the Train and Test dataset is shown below. 458,455 rows are present in the TrainRDD while 45,355 are present in the TestRDD dataset.

In [9]:
print("The number of rows in the training dataset: ", TrainRDD.count())

The number of rows in the training dataset:  458455


In [10]:
print("The number of rows in the test dataset: ", TestRDD2.count())

The number of rows in the test dataset:  45355


## Section 1.2: DataSet Contents and Context

The dataset consists of 14 numeric columns or features and 25 categorical columns or features. The first numerical column in the dataset represents the binary "Yes"/"No" click event represented as either a "1" or a "0" respectively. 

To better understand the dataset, we divided the dataset exploratory section into a numerical exploratory section and a categorical exploratory section. 

In the numerical exploratory section, our main goals were to:

(1) Find the minimum and maximum values of each numerical feature in the column(s) from 1-14.

(2) Find the mean and standard deviation of each of the numerical features contained in the entire dataset.

The distribution within each dataset is discussed in the Exploratory Data Analysis section.

The questions we intended to investigate in the numerical data introduction section were:

(1) What were the types of Labels and what was the the distribution of the Labels (1 and 0)? 
- The distribution of 1 and 0 i.e click vs no-click was shown to be roughly 1/3(section 1.0), indicating an unbalanced dataset but not severely unbalanced.

(2) What were the minimum and maximum values of all the numerical features within each numerical column? 
- This can be done by going through the entire dataset. This is a computationally expensive process but is required. The values need to be saved into an array since the resultant matrix can easily be saved to disk/cached so that we do not have to compute that every time.

(3) What were the means and standard deviations of the numerical features within each numerical column? 
- This is also a computationally expensive process, but the resultant matrix can also be saved to disk/cached so that computation is only done once.

### Some Core Course Concepts Utilized in this Section:
*Caching and Broadcasting the Matrix containing Mean and Standard Deviations* -- Finding the mean and standard deviation for the trainRDD (10GB dataset) involves going through the entire dataset. However, mean and standard deviation are supposed to be utilized for mean-normalizing only the TrainRDD and the TestRDD2. Thus an important concept here is to *first* do the mean and standard deviation calculation and then *cache* these values. Because the matrix of the mean and standard deviation is small, one can broadcast these values without incurring much performance penalty for use in mean-normalizing the TrainRDD and the TestRDD2.

*Mean Normalization for SGD* -- Mean normalization of the numerical variables is very important for Gradient Descent because non-normalized features would take a longer time to converge. To ensure that we converge faster we use mean normalization of the features using the array of mean and standard deviation values.

In [15]:
def ExtractNumerals(toyRDDline):
    
    YY = toyRDDline.split('\t')
    
    if YY[ii] == '':   ## There are empty values/NAN - They are replaced with 0's for this initial Mean, Std calculation
        ReturnValue = 0
    else:
        ReturnValue = float(YY[ii])
    return ReturnValue

In [16]:
Node = []
NodeMeanMax = []
    
for ii in range(1,14):

    featureRDD = trainRDD.map(ExtractNumerals)\
                         .cache()
    feature = featureRDD.mean()
    featureStdev = np.sqrt(featureRDD.variance())
    
    featureMax = featureRDD.max()
    featureMin = featureRDD.min()
    
    Node.append((featureMeans, featureStdev))
    NodeMeanMax.append((featureMax, featureMin))

NameError: name 'featureMeans' is not defined

In [None]:
Node

In [20]:
Needed = sc.broadcast(Node)

In [16]:
NodeMeanMax

[(5775.0, 0),
 (257675.0, -3.0),
 (65535.0, 0),
 (969.0, 0.0),
 (23159456.0, 0.0),
 (431037.0, 0),
 (56311.0, 0.0),
 (6047.0, 0.0),
 (29019.0, 0.0),
 (11.0, 0),
 (231.0, 0.0),
 (4008.0, 0),
 (7393.0, 0)]

In [None]:
NodeMeanMax = [(5775.0, 0),
 (257675.0, -3.0),
 (65535.0, 0),
 (969.0, 0.0),
 (23159456.0, 0.0),
 (431037.0, 0),
 (56311.0, 0.0),
 (6047.0, 0.0),
 (29019.0, 0.0),
 (11.0, 0),
 (231.0, 0.0),
 (4008.0, 0),
 (7393.0, 0)]

In [6]:
NodeMeans = [(1.9136993727636809, 7.184627898705608),
 (105.84841979766556, 391.45781841729996),
 (21.13629851448115, 352.8574390110648),
 (5.735263227368864, 8.346464252535688),
 (18060.51214960742, 68556.28645274039),
 (90.10443053155232, 340.53335300283976),
 (15.62662976373116, 64.6908374702787),
 (12.510823839914709, 16.68706965443228),
 (101.51997332409383, 216.54476824937402),
 (0.33741472109766746, 0.59176564119444),
 (2.6146237734976343, 5.115681630473718),
 (0.2328159762771082, 2.7454850748632738),
 (6.436072751813085, 14.741644511057435)]

Needed = sc.broadcast(NodeMeans)

VBox()

## Section 1.3: Mean-Normalization of the Numerical Variable -- TrainRDD

As discussed above, mean-normalization of the numerical variable is key to enabling fast gradient descent optimization.
We can mean normalize the dataset by splitting the data into Numerical and Categorical variables and then working only on the numerical variable in this section. 
However, that would result to creating additional data and that is O(space) inefficient. A better idea to cater to scalability would be to work with the dataset as is and make modifications to the dataset. This would lead to an O(1) space complexity and thus enable faster computation of the mean normalization. 
Recall, we are broadcasting the Mean and Standard deviation array to all the partitions for computing the mean-normalization. This computation process has  O(n) time complexity and O(1) space complexity, while is ideal for scaling and also ideal for use in a map-reduce paradigmn or environment since it now represents an embarassingly parallel computation.

During the computation of the mean normalization, we are also replacing all NaN or empty values with the calculated mean for that numerical column and replacing all the NaN or empty values with 0 for the categorical columns. 

In [21]:
def NumericValuesOnly(LineRDD):
    """Function takes the RDD Mean normalizes the Numerical Column in the RDD and replaces 'NAN' and '' with mean for 
       Numerical values and '0' for Categorical values"""
    
    Node = Needed.value
    Values = LineRDD.split('\t')
    ZZ = []
    for ii in range(0,40):
        
        if ii < 14:
            if Values[ii] == '':
                ZZ.append(Node[ii-1][0])
            else:
                if (ii == 0):
                    ZZ.append(float(Values[ii]))
                else:
                    ZZ.append((float(Values[ii])- float(Node[ii-1][0]))/float(Node[ii-1][1]))
        else:
            if Values[ii] == '':
                ZZ.append(str(0))
            else:
                ZZ.append(str(Values[ii]))
    
    return (((str(Values[14])+ str(Values[15]) + str(Values[16])), ZZ))

In [23]:
NonNumericRDD = TrainRDD.map(NumericValuesOnly).cache()
NonNumericRDD.take(5)

[('68fd1e64287130e071e126ad',
  [0.0,
   -0.26636026245819294,
   -0.2729500211023082,
   0.00811574638625955,
   -0.6871488397768515,
   -0.052139815829603434,
   0.9217762862301186,
   -0.1488097872925819,
   -0.6298783463831777,
   -0.24253632977921483,
   -0.5701830211308282,
   -0.12014503987823798,
   0.2328159762771082,
   -0.3009211589986225,
   '68fd1e64',
   '287130e0',
   '71e126ad',
   '7846ae91',
   '4cf72387',
   '6f6d9be8',
   'f6ce794a',
   '0b153874',
   'a73ee510',
   '70962768',
   'ab066900',
   'e95d3160',
   '5d4198ed',
   'f7c1b33f',
   '42793602',
   'fc877894',
   'e5ba7672',
   '891589e7',
   '55dd3565',
   'a458ea53',
   'c8d017f7',
   '0',
   '32c7478e',
   '7d290d33',
   'c243e98b',
   '71848e87']),
 ('68fd1e64083aa75b',
  [0.0,
   1.9136993727636809,
   -0.2652863601435636,
   -0.03722834511098185,
   0.9902081315594536,
   -0.09341976470709805,
   -0.12951574388422543,
   -0.08697722867347693,
   0.3888745174837627,
   -0.2563902779685573,
   0.3374147210

## Section 1.4: Categorical Variables

In the categorical exploratory section our main goals were to:

(1) Find the minimum and maximum occurence of each categorical feature in the full dataset as well as the distict set of categorical features present in the dataset.

(2) We were also interested in finding the minimum and maximum count of the categorical feature in each categorical column.

Note: These two methods are computationally expensive but they give an underlying overview of the numerical distribution within the categorical dataset.

More information on the underlying distrbution of the categorical features is in the EDA section.

(3) More interesting and useful here is that we will need to implement an hashing algorithm in this section.

- The need for hashing is borne out of the curse of dimensionality that we would have had if we were to one-hot encode all the categorical features in the dataset. We would probably have had tens of thousands of columns resulting in a very sparse matrix. That would not only be computationally expensive but also a challenge for shuffling through a computing network.

To ensure we have a resulting dense matrix, we used a home-grown hashing function to reduce the dimensionality of each categorical variable from >100 to just 10. This eventually reduced the cardinality of the resultant architecture to 300 rather than in the order of thousands.

### Some Core Course Concepts Utilized in this Section:
*Hashing* -- Although we did not touch on this concept in depth in class, the hashing function as used here is the function that enables us to reduce the cardinality of the categeorical variables from several thousands to only 10 or as many as we deem fit. 

Hashing here was implemented using an home grown algorithm.

Home-Grown Hashing Function Algorithm:

The hashing function uses the co-occurence count of each feature within a column with the click outcome "1" divided by the total count of coccurence of the feature. This results in what we call a "co-occurence count of importance". Because this is normalized by the overall count, the values are always between 0 and 1. With 1 showing that the feature is very important (meaning everytime this feature occured we also had the occurence of a count) and 0 meaning the feature is not important at all (meaning everytime this feature occured we did not see the occurence of a count).

The hashing function enables us to reduce cardinality and also go from a sparse distribution to a denser representation of the categorical variables.

In [24]:
""" This for loop loops through the rows in the RDD containing categorical features and converts the categorical features
    based on feature importance into a more condensed representation between 1 and 10 """


StartFromA = 14
for ii in range(14, 40):
    Node = ii
    def ValuesNonNumericFeatures(toyRDDLine):

        """ Take the node value from a broadcast variable that is sent to the function """
        Values = toyRDDLine.split('\t')
        T14 = Values[Node]
        return (T14, 1)


    def NonNumericFeatures(toyRDDLine):

        """ Take the node value from a broadcast variable that is sent to the function """
        Values = toyRDDLine.split('\t')
        T14 = Values[Node]
        T0 = Values[0]
        return (((T14, T0), 1))

    def ConvertForMerge(toyRDDLine):

        YY = toyRDDLine[0][0]
        YYY = toyRDDLine[0][1]
        YYZ = toyRDDLine[1]

        return ((YY,(YYY,YYZ)))


    def MovingOn(trainRDD):

        Key = trainRDD[0]
        Value = trainRDD[1]
        
        ValueKey = Value[0][0]
        ValueValue = Value[0][1]
        ValueDivide = Value[1]
        
        return ((Key, (ValueKey, float(ValueValue/ValueDivide))))


    def HasherFunction(trainRDD):
    
        """ The Hashing Function converts all categorical variables based on feature importance to values between 
            1 and 10"""
        Key = trainRDD[0]
        Value = trainRDD[1][1]
        
        if (Value >0.9):
            FinalValue = 1
        elif (Value <=0.9) & (Value >0.8):
            FinalValue = 2
        elif (Value <=0.8) & (Value >0.7):
            FinalValue = 3
        elif (Value <=0.7) & (Value>0.6):
            FinalValue = 4
        elif (Value <=0.6) & (Value >0.5):
            FinalValue = 5
        elif (Value <=0.5) & (Value >0.4):
            FinalValue = 6
        elif (Value <=0.4) & (Value >0.3):
            FinalValue = 7
        elif (Value <=0.3) & (Value >0.2):
            FinalValue =8
        elif (Value <=0.2) & (Value >0.1):
            FinalValue = 9
        elif (Value <= 0.1):
            FinalValue = 10
            
        return ((Key, FinalValue))


#def ConvertToSignificance(toyRDDLine):
#print(trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).takeOrdered(100,lambda x: -x[1]))
    
    YY = TrainRDD.map(ValuesNonNumericFeatures)\
                .reduceByKey(lambda x,y: x+y)\
                .cache()

    ZZ = TrainRDD.map(NonNumericFeatures)\
                 .reduceByKey(lambda x,y: x+y)\
                 .map(ConvertForMerge)\
                 .filter(lambda x: x[1][0] == '1')\
                 .leftOuterJoin(YY)\
                 .map(MovingOn).map(HasherFunction).cache()
    TT = ZZ.collect()

    ZZ.unpersist()
    YY.unpersist()

    #NamingDict = "HashDictionary" + str(Node)
    from collections import defaultdict

    NamingDict = defaultdict(list)

    for ii in TT:
        NamingDict[ii[0]] = ii[1]
        

    """ The dictionary containing feature importance is broadcasted to all partitions for use in conversion 
        of the categorical variables to features with lower cardinality - denser representation"""  
    
    YYY = sc.broadcast(NamingDict)
    
    
    def MappingChangesWithDictionary(trainRDD):


        """ Mapping Changes with What the Dictionary has for feature importance """
        Dictionary = YYY.value

        """ Taking in all the Key/Value Components """
        FinalKey = trainRDD[0]
        Value = trainRDD[1]
        Value[Node] = Dictionary.get(Value[Node], 0)

        return (FinalKey,Value)



    NonNumericRDD = NonNumericRDD.map(MappingChangesWithDictionary)\
                    .cache()
    
    
NonNumericRDD.take(2)

[('68fd1e64287130e071e126ad',
  [0.0,
   -0.26636026245819294,
   -0.2729500211023082,
   0.00811574638625955,
   -0.6871488397768515,
   -0.052139815829603434,
   0.9217762862301186,
   -0.1488097872925819,
   -0.6298783463831777,
   -0.24253632977921483,
   -0.5701830211308282,
   -0.12014503987823798,
   0.2328159762771082,
   -0.3009211589986225,
   8,
   8,
   0,
   0,
   8,
   7,
   8,
   8,
   8,
   8,
   8,
   0,
   8,
   8,
   9,
   0,
   7,
   8,
   7,
   8,
   0,
   0,
   8,
   0,
   7,
   0]),
 ('68fd1e64083aa75b',
  [0.0,
   1.9136993727636809,
   -0.2652863601435636,
   -0.03722834511098185,
   0.9902081315594536,
   -0.09341976470709805,
   -0.12951574388422543,
   -0.08697722867347693,
   0.3888745174837627,
   -0.2563902779685573,
   0.33741472109766746,
   -0.3156224116605392,
   0.2328159762771082,
   0.852274469022974,
   8,
   9,
   0,
   0,
   8,
   0,
   8,
   8,
   8,
   8,
   8,
   0,
   8,
   8,
   9,
   0,
   7,
   9,
   8,
   8,
   0,
   0,
   8,
   0,
   8,

### Section 1.4.1: Minimum and Maximum Co-Occurence of Each Categorical Feature

In [None]:
""" This function caalculates the minimum and maximum counts of distinct features in each categorical column """
for ii in range(14,40):
    """ Nodes and their meanings """
    Node = ii
    def ValuesNonNumericFeatures(toyRDDLine):

        """ Take the node value from a broadcast variable that is sent to the function """
        Values = toyRDDLine.split('\t')
        T14 = Values[Node]
        return (T14, 1)
    
    Feature = trainRDD.map(ValuesNonNumericFeatures)\
                    .reduceByKey(lambda x,y: x+y)\
                    .values()\
                    .collect()
    print("This is the minumum value for :", Node, "Minimum: =", min(Feature),
          "This is the maximum value for :", Node, "Maximum: =", max(Feature))
    
    
    
    

### Section 1.4.2: The Features with the Minimum and Maximum Occurence for Each of the 40 Columns

In [None]:
""" This for loop calculates count of the features within each column pickts the top 5-features"""
for ii in range(14,40):
    """ Nodes and their meanings """
    Node = ii
    def ValuesNonNumericFeatures(toyRDDLine):

        """ Take the node value from a broadcast variable that is sent to the function """
        Values = toyRDDLine.split('\t')
        T14 = Values[Node]
        return (T14, 1)
    
    print(trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).takeOrdered(5,lambda x: -x[1]))

## Section 5:  Core Course Concepts Used for Algorithm and Data-Preparation

(1) *Hashing* -- Although we did not touch on this concept indepth in class, the hashing function as used here is the function that enables us to reduce the cardinality of the categorical variables from several thousands to only 10 or as many as we deem fit. 

Hashing here was implemented using an home grown algorithm.

Home-Grown Hashing Function Algorithm:

The hashing function uses the co-occurence count of each feature within a column with the click outcome "1" divided by the total count of coccurence of the feature. This results in what we call a "co-occurence count of importance". Because this is normalized by the overall count, the values are always between 0 and 1. With 1 showing that the feature is very important (meaning everytime this feature occured we also had the occurence of a count) and 0 meaning the feature is not important at all (meaning everytime this feature occured we did not see the occurence of a count). The hashing function enables us to reduce cardinality and also go from a sparse distribution to a denser representation of the categorical variables.

By choosing an hashing from 1-10 we went from a very high dimensional (cardinal) problem to a lower dimensionality. One can argue that even 10 categories for every categorical colun is still a lot, will it be more practical to have the categories be 5 instead of 10. If computation resource is a bottleneck and given that we know there is some similarity amongst some of the features within a column, we can definitely divide the categorical varaibles into 5 classes. Also note that hashing, as discussed previously, enables us to make the data less sparse.

(2) *Caching and Broadcasting the Matrix containing Mean and Standard Deviations* -- Finding the mean and standard deviation for the trainRDD (10GB dataset) involves going through the entire dataset. However, mean and standard deviation are supposed to be utilized for mean-normalizing only the TrainRDD and the TestRDD2. Thus an important concept here is to *first* do the mean and standard deviation calculation and then *cache* these values. Because the matrix of the mean and standard deviation is small, one can broadcast these values without incurring much performance penalty for use in mean-normalizing the TrainRDD and the TestRDD2.

In addition, generating the dictionary containing, as keys the categroical feature, and as value the co-occuring feature importance is a  computationally expensive, the resulting dictionary ( a default dict from python) is not a large data for each column. This data is thus broadcasted for use in the transofrmation of each column to a hashed value between 1 and 10.

Caching was also used multiple times in the initial analysis, this is because some RDDs calculated at an initial time t1 still had multiple uses downstream at times t+1 and t+2. If the RDD were not cached, pyspark would need to reevaluate the RDD everytime a call was made to that RDD. Caching is especially useful when RDD computation is expensive (NonNumericRDDs - These are computationally expensive calculations). Without caching, every call to these RDDs would mean reevaluating them again.

While Caching is good, it also takes away computational resource as some part of memory is allocated to the RDD. When all memory is used for Caching the unused RDDs are spilled-to and stored on disk. Although having RDDs in memry ensures faster computation/evaluation than having RDDs on disk, it is still faster re-evaluating RDDs from memory then recomputing/re-evaluating the RDD. Because of the spill-to-disk problem it is definitely worth it storing in memory only RDDs that are computationally expensive to calculate. Once we are done with the RDD one should ensure to unpersist() the RDD to free memory space for other RDDs.

To ensure scalability one needed to ensure that RDDs were unpersisted() from memory once they were no longer in use.If the RDDs were not unpersisted() then it is totally concievable that one would run out of memory space for computation and eventually RDDs would spill to disk.

(3) *Normalization* -- Normalization of features (numerical features) is extremely important especially for gradient descent. Numerical features can have widely varying ranges. The values can from -inf to +inf. This range of values can result in Stochastic Gradient Descent taking a very long time to converge. To prevent this from occuring, Normalization of the Numerical variables is done.

The method utilized for normalizing here was deployed on the 10GB dataset and it appears to be scalable. The process involves taking a pass through the dataset to calculate the mean and standard deviation of each numerical column and storing each of these values in a list as a set (mean, standard deviation).
Because this set is small one can then broadcast this set to the RDD partitions and do another loop through the dataset this time only doing the mean normalization computation ($x_i - mean)/sd$.

(4) *Mean Normalization for SGD* -- Mean normalization of the numerical variables is very important for Gradient Descent because non-normalized features would take a longer time to converge. To ensure that we converge faster we use mean normalization of the features using the array of mean and standard deviation values.

## Logistic Regression
For the CTR prediction problem, the dependent variable is binary - either the customer clicks on an advertisement or they don't. It is therefore a classification problem, as the dependent variable (click/no-click) is in categorical form. Linear Regression is typically not used for classification problems since it does not perform well when there are outliers. This because the Linear Regression best fit line may not give accurate decision boundaries. 

Instead, for classification problems we use Logistic Regression. Logistic Regression is a classification algorithm used to assign observations to a discrete set of classes. Unlike Linear Regression which outputs continuous number values, Logistic Regression transforms its output using the logistic sigmoid function which maps any real value into another value between 0 and 1 to return a probability value, which can then be mapped to two or more discrete classes. 

### The Sigmoid Function
The sigmoid function can be written as:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $s(z) = \frac{1}{1-e^{-z}}$ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;...(1)

where:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $s(z)$ is the output, i.e. the probability estimate, between 0 and 1 

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $z$ is the input to the function, i.e. the algorithm’s prediction: $\beta_0 + \beta_1x_1 + \beta_2x_2 + ...$
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;(where $x_i = i$th feature or predictor and $\beta_i$ is the $i$th coefficient of the model)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $e$ is the base of natural log

The following is a plot of the sigmoid function:

<img src="sigmoid.png" width=200>

### The Loss Function
All predictive algorithms use a loss function to be minimized in order to arrive at a prediction. For Linear Regression, we typically use Mean Squared Error (MSE) as the loss or cost function. However, we cannot use MSE as the loss function for Logistic Regression. This is because the prediction function is non-linear (due to sigmoid transform). Squaring this prediction as we do in MSE results in a non-convex function with many local minimums. If our cost function has many local minimums, gradient descent may not find the optimal global minimum.

Instead of Mean Squared Error, we use a cost function called Cross-Entropy, also known as Log Loss. Cross-entropy loss can be divided into two separate cost functions: one for y=1 and one for y=0.

Combining the two cost functions, the loss function (which is to be minimized in Logistic Regression) can be written compactly as:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $J(\theta) = -\frac{1}{m}\sum_{i=1}^{m}[y^{(i)}log(h_{\theta}(x^{(i)}))+(1 - y^{(i)})(1 - log(h_{\theta}(x^{(i)}))]$&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;...(2)

where $h_{\theta}(x)$ is the model expression.

Vectorizing this equation we have:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $h = g(X\theta)$

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $J(\theta) = \frac{1}{m}\cdot(-y^{T}log(h)-(1 - y)^{T}log(1-h)$&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;...(3)

where $X$ is the input feature vector, $\theta$ is the parameter vector and $y$ is the observation vector.

### Gradient Descent
To minimize the cost we use Gradient Descent. We know from calculus that to find the minimum of a function, we take the derivative and solve for the derivative equal to zero. 

Taking the derivative of the cost function, we have:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $C' = x(s(z)-y)$&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;...(4)

where

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$C′$ is the derivative of cost with respect to weights

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$y$ is the actual class label (0 or 1)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$s(z)$ is the model’s prediction

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$x$ is the feature vector.

Steps to be implented include:

  1. Calculate gradient average
  2. Multiply by learning rate
  3. Subtract from weights 

### The Implementation
In the code below, first we define the Sigmoid function. It takes a vector input and returns the Sigmoid value. 

The `predict()` method takes a single row of the RDD as input. We know that our dataset comprises 14 numeric features and  25 categorical features (see above). Also, our categorical features have been indexed and hashed earlier (see above) hence they have been transformed into numbers.  

We first separately calculate the linear combination of the model weights and the numeric variables and the categorical variables. 

We then combine the transformed numerical and categorical features to create the vector to be input to the sigmoid function. This implements equation (1) above.  

Next, we iterate through the features, updating the weights by adding the product of the learning rate and the squared error, thus implementing equation (3) above. 

At the end of the iteration we have the predicted value. 

In [19]:
def sigmaFunction(z):
    yhat = 1.0/(1.0 + np.exp(-1 * z))
    return (yhat)

In [12]:
initialWeights = (np.ones(300))

VBox()

In [32]:
lrate = 0.7
initialWeights = list(np.ones(300))
def predict(RDDRow):
    
    """ The first Coefficient is always the intercept or the Bias term"""
    Key = RDDRow[0]
    Value = RDDRow[1]
    realY = Value[0]
    RestY = Value[1:14]
    RestYCat = Value[14:]
    #RestY.insert(1,0)
    initialWeights = initWeight.value
    Y = initialWeights[0]
    
    for i in range(len(RestY)-1):
        
        Y+= initialWeights[i+1] *RestY[i]
    
    for index,value in enumerate(RestYCat):
        Y += initialWeights[14+(value)+(index*11)]
    
        
    ZZ = sigmaFunction(Y)
    
    Error = (realY - ZZ)
    ErrorSquared = Error**2
    
    
    for index,value in enumerate(initialWeights):

        if index == 0:
            initialWeights[index] = initialWeights[index] + (lrate * Error * ZZ * (1.0 - ZZ))
        else:
            
            if index <= 14:
                initialWeights[index] = initialWeights[index] + ((lrate* Error * ZZ * (1.0 - ZZ))*Value[index-1])
    
    for indices, values in enumerate(RestYCat):
        
        
        IndexToUpdate = 14+values +(indices*11)
        initialWeights[IndexToUpdate] = initialWeights[IndexToUpdate] + ((lrate* Error * ZZ * (1.0 - ZZ)))
                
                                            
    return(initialWeights)

VBox()

In [14]:
PredictionCalc = NonNumericRDD.map(predict).cache()
Y = PredictionCalc.collect()[-1]
Y

VBox()

[0.9999999995924044, 1.0, 1.000000000062389, 1.000000000097784, 1.0000000000097127, 1.0000000001410299, 1.000000000073139, 1.000000000029065, 1.0000000000740206, 1.000000000192697, 1.000000000099937, 1.0000000001310472, 1.0000000001001772, 0.9999999999075673, 1.0000000000276408, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999995924044, 0.9999999999879455, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999996044588, 0.9999999997636152, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999998287892, 0.9999999999191619, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999996732425, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999995924044, 0.9999999999035522, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999996888522, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999995924044, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999995924044, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999995924044, 0.9999999999547172, 1.0, 1.0, 1.0, 

In [22]:
lrate = 0.7
initialWeights = Y
PredictionCalc = NonNumericRDD.map(predict).cache()

Y = PredictionCalc.collect()[-1]
Y

VBox()

[0.9999999991848088, 1.0, 1.0000000001247777, 1.000000000195568, 1.0000000000194251, 1.0000000002820597, 1.000000000146278, 1.00000000005813, 1.0000000001480411, 1.000000000385394, 1.000000000199874, 1.000000000262094, 1.0000000002003544, 0.9999999998151345, 1.000000000055281, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999991848088, 0.9999999999758911, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999992089177, 0.9999999995272304, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999996575784, 0.9999999998383238, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.999999999346485, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999991848088, 0.9999999998071043, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999993777045, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999991848088, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999991848088, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999991848088, 0.9999999999094344, 1.0, 1.0, 1.0, 1.0

In [23]:
lrate = 0.9
initialWeights = Y
PredictionCalc = NonNumericRDD.map(predict).cache()

Y = PredictionCalc.collect()[-1]
Y

VBox()

[0.9999999987772131, 1.0, 1.0000000001871665, 1.000000000293352, 1.0000000000291376, 1.0000000004230896, 1.0000000002194172, 1.000000000087195, 1.0000000002220617, 1.000000000578091, 1.0000000002998108, 1.0000000003931406, 1.0000000003005316, 0.9999999997227018, 1.000000000082921, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999987772131, 0.9999999999638366, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999988133765, 0.9999999992908456, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999994863675, 0.9999999997574857, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999990197275, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999987772131, 0.9999999997106565, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999990665567, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999987772131, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999987772131, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.9999999987772131, 0.9999999998641517, 1.0, 1.0, 1.0

In [33]:
def realLoss(toyRDD):
    """ The first Coefficient is always the intercept or the Bias term"""
    Key = toyRDD[0]
    Value = toyRDD[1]
    realY = Value[0]
    RestY = Value[1:14]
    RestYCat = Value[14:]
    #RestY.insert(1,0)
    
    initialWeights = initWeight.value
    Y = initialWeights[0]
    
    for i in range(len(RestY)-1):
        
        Y+= initialWeights[i+1] *RestY[i]
    
    for index,value in enumerate(RestYCat):
        Y += initialWeights[14+(value)+(index*11)]
    
        
    ZZ = sigmaFunction(Y)
    
    Error = (realY - ZZ)
    return Error

VBox()

In [None]:
LossShift = []
initWeights = list(np.ones(300))
initWeight = sc.broadcast(initWeights)

for ii in range(40):
    lrate = 0.9
    PredictionCalc = NonNumericRDD.map(predict).cache()
    Y = PredictionCalc.collect()[-1]
    print("This is Run: ", ii)
    Y
    initWeight = sc.broadcast(Y)
    Looser = NonNumericRDD.map(realLoss).mean()
    LossShift.append(Looser)

VBox()

In [31]:
LossShift

VBox()

[-0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929, -0.7438821893789929]

In [20]:
def RunningEpochs(Epoch, lrate, initialWeights):
    Epoch = Epoch
    lrate = lrate

########################################################################
#################### Initial Start of the Function #####################

    def predict(RDDRow):
    
        """ The first Coefficient is always the intercept or the Bias term"""
        Key = RDDRow[0]
        Value = RDDRow[1]
        realY = Value[0]
        RestY = Value[1:14]
        RestYCat = Value[14:]
        #RestY.insert(1,0)

        Y = initialWeights[0]

        for i in range(len(RestY)-1):

            Y+= initialWeights[i+1] *RestY[i]

        for index,value in enumerate(RestYCat):
            Y += initialWeights[14+(value)+(index*11)]


        ZZ = sigmaFunction(Y)

        Error = (realY - ZZ)
        ErrorSquared = Error**2


        for index,value in enumerate(initialWeights):

            if index == 0:
                initialWeights[index] = initialWeights[index] + (lrate * Error * ZZ * (1.0 - ZZ))
            else:

                if index <= 14:
                    initialWeights[index] = initialWeights[index] + ((lrate* Error * ZZ * (1.0 - ZZ))*Value[index-1])

        for indices, values in enumerate(RestYCat):

            IndexToUpdate = 14+values +(indices*11)
            initialWeights[IndexToUpdate] = initialWeights[IndexToUpdate] + ((lrate* Error * ZZ * (1.0 - ZZ)))
            
        return(initialWeights)
    
################################################################################ 
##################### Final End of The Function ################################

    for ii in range(Epoch):
        
        
        PredictnewWeight= NonNumericRDD.map(predict).collect()[-1]
        initialWeights = PredictionCalc
    return initialWeights  



VBox()

In [21]:
YY = list(np.ones(300))
RunningEpochs(20, 0.89, YY)

VBox()

Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
Traceback (most recent call last):
  File "<stdin>", line 56, in RunningEpochs
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 814, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2472, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2405, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, 

In [10]:
def MakingRDDDataFrames(rowRDD):
    
    """Getting the RDD in the format of Sets and Lists"""
    Key = rowRDD[0]
    ####################
    String1 = str(Key[0])
    String2 = str(Key[1])
    String3 = str(Key[2])
    FinalKey = String1 + String2 + String3
    
    Values = rowRDD[1]
    
    ### Getting each individual Value WoW
    Num1 = Values[0]
    Num2 = Values[1]
    Num3 = Values[3]
    Num4 = Values[4]
    Num5 = Values[5]
    Num6 = Values[6]
    Num7 = Values[7]
    Num8 = Values[8]
    Num9 = Values[9]
    Num10 = Values[10]
    Num11 = Values[11]
    Num12 = Values[12]
    Num13 = Values[13]
    
    return(FinalKey, Num1, Num2, Num3, Num4, Num5, Num6, Num7, Num8, Num9, Num10, Num11, Num12, Num13)

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.44.68:18888/sessions/2 with error payload: "Session '2' not found."


## Working with a Random Split 70:30 Toy Example -- The Best Splitting Criterion

In [8]:
NumericRDD = TrainRDD.map(NumericValuesOnly).map(MakingRDDDataFrames).cache()
NumericRDD.take(10)

VBox()

name 'MakingRDDDataFrames' is not defined
Traceback (most recent call last):
NameError: name 'MakingRDDDataFrames' is not defined



In [10]:
NumericRDD1 = TrainRDD.map(NumericValuesOnly).cache()
NumericRDD1.take(10)

VBox()

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 47956)
----------------------------------------
[('68fd1e6480e26c9bfb936136', [0.0, -0.1271742093878368, -0.26784091379647845, -0.04573036226671461, -0.6871488397768515, -0.24328202434221455, -0.25285168037808703, -0.009686530399595706, -0.6298783463831777, 0.36703739055185386, 1.119675142958533, -0.12014503987823798, 0, -0.3009211589986225]), ('68fd1e64f0cf00246f67f7e5', [0.0, 0.01201184368251934, -0.27039546744939336, 0.06479586075781131, -0.567337627538544, -0.261952813940516, -0.24110540071200498, -0.21064234591168687, -0.6298783463831777, -0.4503455526193519, 1.119675142958533, -0.3156224116605392, 0, -0.16525108511372877]), ('8cf07265ae46a29dc81688bb', [0.0, 0.15119789675287548, -0.2729500211023082, 0, -0.6871488397768515, -0.2634114693778833, -0.2645979600441691, -0.19518420625691063, -0.7497316244851707, -0.4688174835384752, 1.119675142958533, -0.3156224116605392, 0, -0.4

In [9]:
from pyspark.sql import Row
from pyspark.sql.types import *

schema = StructType([StructField("Keys", StringType(), False),
                    StructField("Num1", DoubleType(), False),
                    StructField("Num2", DoubleType(), False),
                    StructField("Num3", DoubleType(), False),
                    StructField("Num4", DoubleType(), False),
                    StructField("Num5", DoubleType(), False),
                    StructField("Num6", DoubleType(), False),
                    StructField("Num7", DoubleType(), False),
                    StructField("Num8", DoubleType(), False),
                    StructField("Num9", DoubleType(), False),
                    StructField("Num10", DoubleType(), False),
                    StructField("Num11", DoubleType(), False),
                    StructField("Num12", DoubleType(), False),
                    StructField("Num13", DoubleType(), False),])

VBox()

In [13]:
Numeric = NumericRDD.map(lambda y: Row(Keys = y[0], Num1 = float(y[1]), Num2 = float(y[2]), Num3 = float(y[3]),
                                   Num4 = float(y[4]), Num5 = float(y[5]), Num6 = float(y[6]), Num7 = float(y[7]),
                                   Num8 = float(y[8]), Num9 = float(y[9]), Num10 = float(y[10]), Num11 = float(y[11]),
                                   Num12 = float(y[12]), Num13 = float(y[13])))

VBox()

In [14]:
DataFrame = spark.createDataFrame(Numeric, schema)

VBox()

u'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 693, in createDataFrame
    jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'



In [None]:
DataFrame.head(20)

In [11]:
def NonNumericValuesOnly(LineRDD):
    """Function takes the RDD and subsets the RDD"""
    Values = LineRDD.split('\t')
    ZZ = []
    RealKey = str(Values[14]) + str(Values[15]) + str(Values[16])
    for ii in range(14,40):
        if Values[ii] == '':
            ZZ.append(str(0))
        else:
            ZZ.append(str(Values[ii]))
    
    return (RealKey, ZZ)


def MakingRDDDataFrames(rowRDD):
    
    """Getting the RDD in the format of Sets and Lists"""
    Key = rowRDD[0]
    ####################
    
    Values = rowRDD[1]
    
    ### Getting each individual Value WoW
    Str14 = Values[0]
    Str15 = Values[1]
    Str16 = Values[3]
    Str17 = Values[4]
    Str18 = Values[5]
    Str19 = Values[6]
    Str20 = Values[7]
    Str21 = Values[8]
    Str22 = Values[9]
    Str23 = Values[10]
    Str24 = Values[11]
    Str25 = Values[12]
    Str26 = Values[13]
    Str27 = Values[14]
    Str28 = Values[15]
    Str29 = Values[16]
    Str30 = Values[17]
    Str31 = Values[18]
    Str32 = Values[19]
    Str33 = Values[20]
    Str34 = Values[21]
    Str35 = Values[22]
    Str36 = Values[23]
    Str37 = Values[24]
    Str38 = Values[25]
    Str39 = Values[26]
    
    return(FinalKey, Str14, Str15, Str16, Str17, Str18, Str19, Str20, Str21, Str22, Str23, Str24, Str25, Str26, Str27, Str28, Str29,
          Str30, Str31, Str32, Str33, Str34, Str35, Str36, Str37, Str38, Str39)

VBox()

In [12]:
NonNumericRDD = TrainRDD.map(NonNumericValuesOnly).cache()
NonNumericRDD.take(20)


VBox()

[('68fd1e6480e26c9bfb936136', ['68fd1e64', '80e26c9b', 'fb936136', '7b4723c4', '25c83c98', '7e0ccccf', 'de7995b8', '1f89b562', 'a73ee510', 'a8cd5504', 'b2cb9c98', '37c9c164', '2824a5f6', '1adce6ef', '8ba8b39a', '891b62e7', 'e5ba7672', 'f54016b9', '21ddcdc9', 'b1252a9d', '07b5194c', '0', '3a171ecb', 'c5c50484', 'e8b83407', '9727dd16']), ('68fd1e64f0cf00246f67f7e5', ['68fd1e64', 'f0cf0024', '6f67f7e5', '41274cd7', '25c83c98', 'fe6b92e5', '922afcc0', '0b153874', 'a73ee510', '2b53e5fb', '4f1b46f3', '623049e6', 'd7020589', 'b28479f6', 'e6c5b5cd', 'c92f3b61', '07c540c4', 'b04e4670', '21ddcdc9', '5840adea', '60f6221e', '0', '3a171ecb', '43f13e8b', 'e8b83407', '731c3655']), ('8cf07265ae46a29dc81688bb', ['8cf07265', 'ae46a29d', 'c81688bb', 'f922efad', '25c83c98', '13718bbd', 'ad9fa255', '0b153874', 'a73ee510', '5282c137', 'e5d8af57', '66a76a26', 'f06c53ac', '1adce6ef', '8ff4b403', '01adbab4', '1e88c74f', '26b3c7a7', '0', '0', '21c9516a', '0', '32c7478e', 'b34f3128', '0', '0']), ('05db91646c9c9c

## 16

In [16]:
##### 16

""" Nodes and their meanings """
Node = 16
def ValuesNonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    return (T14, 1)


def NonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    T0 = Values[0]
    return (((T14, T0), 1))

def ConvertForMerge(toyRDDLine):
    
    YY = toyRDDLine[0][0]
    YYY = toyRDDLine[0][1]
    YYZ = toyRDDLine[1]
    
    return ((YY,(YYY,YYZ)))

def GetImportance(toyRDDLine):
    """Get the ToyRDD and then divide by the count of occurence"""
    PayLoad = toyRDDLine
    Key = toyRDDLine[0]
    PayLoads = PayLoad[1]
    PayLoads2 = PayLoads[0]  ##This is the real payload to divide through
    Value = float(PayLoads[1])   ##This is the numerical estimate value
    
    if type(PayLoads2) == list:
        Lister= []
        for ii in PayLoads2:
            if (ii[0] == "1"):
                if (ii[1]/float(Value) >0.9):
                    Lister.append(1)
                elif (ii[1]/float(Value) <=0.9) & (ii[1]/float(Value) >0.8):
                    Lister.append(2)
                elif (ii[1]/float(Value) <=0.8) & (ii[1]/float(Value) >0.7):
                    Lister.append(3)
                elif (ii[1]/float(Value) <=0.7) & (ii[1]/float(Value) >0.6):
                    Lister.append(4)
                elif (ii[1]/float(Value) <=0.6) & (ii[1]/float(Value) >0.5):
                    Lister.append(5)
                elif (ii[1]/float(Value) <=0.5) & (ii[1]/float(Value) >0.4):
                    Lister.append(6)
                elif (ii[1]/float(Value) <=0.4) & (ii[1]/float(Value) >0.3):
                    Lister.append(7)
                elif (ii[1]/float(Value) <=0.3) & (ii[1]/float(Value) >0.2):
                    Lister.append(8)
                elif (ii[1]/float(Value) <=0.2) & (ii[1]/float(Value) >0.1):
                    Lister.append(9)
                elif (ii[1] <= 0.1):
                    Lister.append(10)
            else:
                continue
                
                
    else:
        Lister =[]
        if (PayLoads2[0] == "1"):
            if (PayLoads2[1]/float(Value) >0.9):
                Lister.append(1)
            elif (PayLoads2[1]/float(Value) <=0.9) & (PayLoads2[1]/float(Value) >0.8):
                Lister.append(2)
            elif (PayLoads2[1]/float(Value) <=0.8) & (PayLoads2[1]/float(Value) >0.7):
                Lister.append(3)
            elif (PayLoads2[1]/float(Value) <=0.7) & (PayLoads2[1]/float(Value) >0.6):
                Lister.append(4)
            elif (PayLoads2[1]/float(Value) <=0.6) & (PayLoads2[1]/float(Value) >0.5):
                Lister.append(5)
            elif (PayLoads2[1]/float(Value) <=0.5) & (PayLoads2[1]/float(Value) >0.4):
                Lister.append(6)
            elif (PayLoads2[1]/float(Value) <=0.4) & (PayLoads2[1]/float(Value) >0.3):
                Lister.append(7)
            elif (PayLoads2[1]/float(Value) <=0.3) & (PayLoads2[1]/float(Value) >0.2):
                Lister.append(8)
            elif (PayLoads2[1]/float(Value) <=0.2) & (PayLoads2[1]/float(Value) >0.1):
                Lister.append(9)
            elif (PayLoads2[1] <= 0.1):
                Lister.append(10)
        
    return (Key, Lister)


    
    


#def ConvertToSignificance(toyRDDLine):
    

#print(trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).takeOrdered(100,lambda x: -x[1]))
    
YY = trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).sortBy(lambda x: -x[1]).cache()

ZZ = trainRDD.map(NonNumericFeatures)\
             .reduceByKey(lambda x,y: x+y)\
             .map(ConvertForMerge)\
             .union(YY)\
             .reduceByKey(lambda x,y: [x] + [y])\
             .map(GetImportance).filter(lambda x: len(x[1]) != 0 ).cache()
TT = ZZ.collect()

ZZ.unpersist()
YY.unpersist()

#NamingDict = "HashDictionary" + str(Node)
from collections import defaultdict

NamingDict = defaultdict(list)

for ii in TT:
    NamingDict[ii[0]] = ii[1]

VBox()

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 4 times, most recent failure: Lost task 1.3 in stage 29.0 (TID 3616, ip-172-31-13-1.us-west-2.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1544211040330_0001/container_1544211040330_0001_01_000002/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/mnt1/yarn/usercache/livy/appcache/application_1544211040330_0001/container_1544211040330_0001_01_000002/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt1/yarn/usercache/livy/appcache/application_1544211040330_0001/container_1544211040330_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.is

In [17]:
Node = 2
YYY = sc.broadcast(NamingDict)
def MappingChangesWithDictionary(trainRDD):
    
    
    Dictionary = YYY.value
    
    """ Taking in all the Key/Value Components """
    FinalKey = trainRDD[0]
    
    
    
    Value = trainRDD[1]
    Value[Node] = Dictionary[Value[Node]]
    
    return (FinalKey,Value[Node])


Hashing_1 = NonNumericRDD.map(MappingChangesWithDictionary).cache()
Hashing_1.sample(False, 0.0000001, None).collect()

VBox()

name 'NamingDict' is not defined
Traceback (most recent call last):
NameError: name 'NamingDict' is not defined



In [18]:
##### 16

""" Nodes and their meanings """
Node = 17
def ValuesNonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    return (T14, 1)


def NonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    T0 = Values[0]
    return (((T14, T0), 1))

def ConvertForMerge(toyRDDLine):
    
    YY = toyRDDLine[0][0]
    YYY = toyRDDLine[0][1]
    YYZ = toyRDDLine[1]
    
    return ((YY,(YYY,YYZ)))

def GetImportance(toyRDDLine):
    """Get the ToyRDD and then divide by the count of occurence"""
    PayLoad = toyRDDLine
    Key = toyRDDLine[0]
    PayLoads = PayLoad[1]
    PayLoads2 = PayLoads[0]  ##This is the real payload to divide through
    Value = float(PayLoads[1])   ##This is the numerical estimate value
    
    if type(PayLoads2) == list:
        Lister= []
        for ii in PayLoads2:
            if (ii[0] == "1"):
                if (ii[1]/float(Value) >0.9):
                    Lister.append(1)
                elif (ii[1]/float(Value) <=0.9) & (ii[1]/float(Value) >0.8):
                    Lister.append(2)
                elif (ii[1]/float(Value) <=0.8) & (ii[1]/float(Value) >0.7):
                    Lister.append(3)
                elif (ii[1]/float(Value) <=0.7) & (ii[1]/float(Value) >0.6):
                    Lister.append(4)
                elif (ii[1]/float(Value) <=0.6) & (ii[1]/float(Value) >0.5):
                    Lister.append(5)
                elif (ii[1]/float(Value) <=0.5) & (ii[1]/float(Value) >0.4):
                    Lister.append(6)
                elif (ii[1]/float(Value) <=0.4) & (ii[1]/float(Value) >0.3):
                    Lister.append(7)
                elif (ii[1]/float(Value) <=0.3) & (ii[1]/float(Value) >0.2):
                    Lister.append(8)
                elif (ii[1]/float(Value) <=0.2) & (ii[1]/float(Value) >0.1):
                    Lister.append(9)
                elif (ii[1] <= 0.1):
                    Lister.append(10)
            else:
                continue
                
                
    else:
        Lister =[]
        if (PayLoads2[0] == "1"):
            if (PayLoads2[1]/float(Value) >0.9):
                Lister.append(1)
            elif (PayLoads2[1]/float(Value) <=0.9) & (PayLoads2[1]/float(Value) >0.8):
                Lister.append(2)
            elif (PayLoads2[1]/float(Value) <=0.8) & (PayLoads2[1]/float(Value) >0.7):
                Lister.append(3)
            elif (PayLoads2[1]/float(Value) <=0.7) & (PayLoads2[1]/float(Value) >0.6):
                Lister.append(4)
            elif (PayLoads2[1]/float(Value) <=0.6) & (PayLoads2[1]/float(Value) >0.5):
                Lister.append(5)
            elif (PayLoads2[1]/float(Value) <=0.5) & (PayLoads2[1]/float(Value) >0.4):
                Lister.append(6)
            elif (PayLoads2[1]/float(Value) <=0.4) & (PayLoads2[1]/float(Value) >0.3):
                Lister.append(7)
            elif (PayLoads2[1]/float(Value) <=0.3) & (PayLoads2[1]/float(Value) >0.2):
                Lister.append(8)
            elif (PayLoads2[1]/float(Value) <=0.2) & (PayLoads2[1]/float(Value) >0.1):
                Lister.append(9)
            elif (PayLoads2[1] <= 0.1):
                Lister.append(10)
        
    return (Key, Lister)


    
    


#def ConvertToSignificance(toyRDDLine):
    

#print(trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).takeOrdered(100,lambda x: -x[1]))
    
YY = trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).sortBy(lambda x: -x[1]).cache()

ZZ = trainRDD.map(NonNumericFeatures)\
             .reduceByKey(lambda x,y: x+y)\
             .map(ConvertForMerge)\
             .union(YY)\
             .reduceByKey(lambda x,y: [x] + [y])\
             .map(GetImportance).filter(lambda x: len(x[1]) != 0 ).cache()
TT = ZZ.collect()

ZZ.unpersist()
YY.unpersist()

#NamingDict = "HashDictionary" + str(Node)
from collections import defaultdict

NamingDict1 = defaultdict(list)

for ii in TT:
    NamingDict1[ii[0]] = ii[1]

VBox()

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 4 times, most recent failure: Lost task 0.3 in stage 38.0 (TID 4804, ip-172-31-2-59.us-west-2.compute.internal, executor 8): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1544211040330_0001/container_1544211040330_0001_01_000020/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/mnt1/yarn/usercache/livy/appcache/application_1544211040330_0001/container_1544211040330_0001_01_000020/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt1/yarn/usercache/livy/appcache/application_1544211040330_0001/container_1544211040330_0001_01_000020/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.is

In [19]:
YYY.unpersist()

VBox()

name 'YYY' is not defined
Traceback (most recent call last):
NameError: name 'YYY' is not defined



In [20]:
Node = 3
YYY = sc.broadcast(NamingDict1)
def MappingChangesWithDictionary(trainRDD):
    
    
    Dictionary = YYY.value
    
    """ Taking in all the Key/Value Components """
    FinalKey = trainRDD[0]
    
    
    
    Value = trainRDD[1]
    Value[Node] = Dictionary[Value[Node]]
    
    return (FinalKey,Value[Node])


Hashing_2 = NonNumericRDD.map(MappingChangesWithDictionary).cache()
Hashing_2.sample(False, 0.0000001, None).collect()

VBox()

name 'NamingDict1' is not defined
Traceback (most recent call last):
NameError: name 'NamingDict1' is not defined



In [21]:
Hashing3 = Hashing_1.leftOuterJoin(Hashing_2).cache()
Hashing3.take(20)

VBox()

name 'Hashing_1' is not defined
Traceback (most recent call last):
NameError: name 'Hashing_1' is not defined



## 19

In [None]:
##### 19

""" Nodes and their meanings """
Node = 19
def ValuesNonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    return (T14, 1)


def NonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    T0 = Values[0]
    return (((T14, T0), 1))

def ConvertForMerge(toyRDDLine):
    
    YY = toyRDDLine[0][0]
    YYY = toyRDDLine[0][1]
    YYZ = toyRDDLine[1]
    
    return ((YY,(YYY,YYZ)))

def GetImportance(toyRDDLine):
    """Get the ToyRDD and then divide by the count of occurence"""
    PayLoad = toyRDDLine
    Key = toyRDDLine[0]
    PayLoads = PayLoad[1]
    PayLoads2 = PayLoads[0]  ##This is the real payload to divide through
    Value = float(PayLoads[1])   ##This is the numerical estimate value
    
    if type(PayLoads2) == list:
        Lister= []
        for ii in PayLoads2:
            if (ii[0] == "1"):
                if (ii[1]/float(Value) >0.9):
                    Lister.append(1)
                elif (ii[1]/float(Value) <=0.9) & (ii[1]/float(Value) >0.8):
                    Lister.append(2)
                elif (ii[1]/float(Value) <=0.8) & (ii[1]/float(Value) >0.7):
                    Lister.append(3)
                elif (ii[1]/float(Value) <=0.7) & (ii[1]/float(Value) >0.6):
                    Lister.append(4)
                elif (ii[1]/float(Value) <=0.6) & (ii[1]/float(Value) >0.5):
                    Lister.append(5)
                elif (ii[1]/float(Value) <=0.5) & (ii[1]/float(Value) >0.4):
                    Lister.append(6)
                elif (ii[1]/float(Value) <=0.4) & (ii[1]/float(Value) >0.3):
                    Lister.append(7)
                elif (ii[1]/float(Value) <=0.3) & (ii[1]/float(Value) >0.2):
                    Lister.append(8)
                elif (ii[1]/float(Value) <=0.2) & (ii[1]/float(Value) >0.1):
                    Lister.append(9)
                elif (ii[1] <= 0.1):
                    Lister.append(10)
            else:
                continue
                
                
    else:
        Lister =[]
        if (PayLoads2[0] == "1"):
            if (PayLoads2[1]/float(Value) >0.9):
                Lister.append(1)
            elif (PayLoads2[1]/float(Value) <=0.9) & (PayLoads2[1]/float(Value) >0.8):
                Lister.append(2)
            elif (PayLoads2[1]/float(Value) <=0.8) & (PayLoads2[1]/float(Value) >0.7):
                Lister.append(3)
            elif (PayLoads2[1]/float(Value) <=0.7) & (PayLoads2[1]/float(Value) >0.6):
                Lister.append(4)
            elif (PayLoads2[1]/float(Value) <=0.6) & (PayLoads2[1]/float(Value) >0.5):
                Lister.append(5)
            elif (PayLoads2[1]/float(Value) <=0.5) & (PayLoads2[1]/float(Value) >0.4):
                Lister.append(6)
            elif (PayLoads2[1]/float(Value) <=0.4) & (PayLoads2[1]/float(Value) >0.3):
                Lister.append(7)
            elif (PayLoads2[1]/float(Value) <=0.3) & (PayLoads2[1]/float(Value) >0.2):
                Lister.append(8)
            elif (PayLoads2[1]/float(Value) <=0.2) & (PayLoads2[1]/float(Value) >0.1):
                Lister.append(9)
            elif (PayLoads2[1] <= 0.1):
                Lister.append(10)
        
    return (Key, Lister)


    
    


#def ConvertToSignificance(toyRDDLine):
    

#print(trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).takeOrdered(100,lambda x: -x[1]))
    
YY = trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).sortBy(lambda x: -x[1]).cache()

ZZ = trainRDD.map(NonNumericFeatures)\
             .reduceByKey(lambda x,y: x+y)\
             .map(ConvertForMerge)\
             .union(YY)\
             .reduceByKey(lambda x,y: [x] + [y])\
             .map(GetImportance).filter(lambda x: len(x[1]) != 0 ).cache()
TT = ZZ.collect()

ZZ.unpersist()
YY.unpersist()

#NamingDict = "HashDictionary" + str(Node)
from collections import defaultdict

NamingDict2 = defaultdict(list)

for ii in TT:
    NamingDict2[ii[0]] = ii[1]

In [None]:
YYY.unpersist()

In [None]:
Node = 5
YYY = sc.broadcast(NamingDict2)
def MappingChangesWithDictionary(trainRDD):
    
    
    Dictionary = YYY.value
    
    """ Taking in all the Key/Value Components """
    FinalKey = trainRDD[0]
    
    
    
    Value = trainRDD[1]
    Value[Node] = Dictionary[Value[Node]]
    
    return (FinalKey,Value[Node])


Hashing_4 = NonNumericRDD.map(MappingChangesWithDictionary).cache()
Hashing_4.sample(False, 0.0000001, None).collect()

In [None]:
Hashing_1.unpersist()
Hashing_2.unpersist()

In [None]:
Hashing5 = Hashing_3.leftOuterJoin(Hashing_4).cache()
Hashing5.take(20)

## 20

In [None]:
##### 20

""" Nodes and their meanings """
Node = 20
def ValuesNonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    return (T14, 1)


def NonNumericFeatures(toyRDDLine):

    """ Take the node value from a broadcast variable that is sent to the function """
    Values = toyRDDLine.split('\t')
    T14 = Values[Node]
    T0 = Values[0]
    return (((T14, T0), 1))

def ConvertForMerge(toyRDDLine):
    
    YY = toyRDDLine[0][0]
    YYY = toyRDDLine[0][1]
    YYZ = toyRDDLine[1]
    
    return ((YY,(YYY,YYZ)))

def GetImportance(toyRDDLine):
    """Get the ToyRDD and then divide by the count of occurence"""
    PayLoad = toyRDDLine
    Key = toyRDDLine[0]
    PayLoads = PayLoad[1]
    PayLoads2 = PayLoads[0]  ##This is the real payload to divide through
    Value = float(PayLoads[1])   ##This is the numerical estimate value
    
    if type(PayLoads2) == list:
        Lister= []
        for ii in PayLoads2:
            if (ii[0] == "1"):
                if (ii[1]/float(Value) >0.9):
                    Lister.append(1)
                elif (ii[1]/float(Value) <=0.9) & (ii[1]/float(Value) >0.8):
                    Lister.append(2)
                elif (ii[1]/float(Value) <=0.8) & (ii[1]/float(Value) >0.7):
                    Lister.append(3)
                elif (ii[1]/float(Value) <=0.7) & (ii[1]/float(Value) >0.6):
                    Lister.append(4)
                elif (ii[1]/float(Value) <=0.6) & (ii[1]/float(Value) >0.5):
                    Lister.append(5)
                elif (ii[1]/float(Value) <=0.5) & (ii[1]/float(Value) >0.4):
                    Lister.append(6)
                elif (ii[1]/float(Value) <=0.4) & (ii[1]/float(Value) >0.3):
                    Lister.append(7)
                elif (ii[1]/float(Value) <=0.3) & (ii[1]/float(Value) >0.2):
                    Lister.append(8)
                elif (ii[1]/float(Value) <=0.2) & (ii[1]/float(Value) >0.1):
                    Lister.append(9)
                elif (ii[1] <= 0.1):
                    Lister.append(10)
            else:
                continue
                
                
    else:
        Lister =[]
        if (PayLoads2[0] == "1"):
            if (PayLoads2[1]/float(Value) >0.9):
                Lister.append(1)
            elif (PayLoads2[1]/float(Value) <=0.9) & (PayLoads2[1]/float(Value) >0.8):
                Lister.append(2)
            elif (PayLoads2[1]/float(Value) <=0.8) & (PayLoads2[1]/float(Value) >0.7):
                Lister.append(3)
            elif (PayLoads2[1]/float(Value) <=0.7) & (PayLoads2[1]/float(Value) >0.6):
                Lister.append(4)
            elif (PayLoads2[1]/float(Value) <=0.6) & (PayLoads2[1]/float(Value) >0.5):
                Lister.append(5)
            elif (PayLoads2[1]/float(Value) <=0.5) & (PayLoads2[1]/float(Value) >0.4):
                Lister.append(6)
            elif (PayLoads2[1]/float(Value) <=0.4) & (PayLoads2[1]/float(Value) >0.3):
                Lister.append(7)
            elif (PayLoads2[1]/float(Value) <=0.3) & (PayLoads2[1]/float(Value) >0.2):
                Lister.append(8)
            elif (PayLoads2[1]/float(Value) <=0.2) & (PayLoads2[1]/float(Value) >0.1):
                Lister.append(9)
            elif (PayLoads2[1] <= 0.1):
                Lister.append(10)
        
    return (Key, Lister)


    
    


#def ConvertToSignificance(toyRDDLine):
    

#print(trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).takeOrdered(100,lambda x: -x[1]))
    
YY = trainRDD.map(ValuesNonNumericFeatures).reduceByKey(lambda x,y: x+y).sortBy(lambda x: -x[1]).cache()

ZZ = trainRDD.map(NonNumericFeatures)\
             .reduceByKey(lambda x,y: x+y)\
             .map(ConvertForMerge)\
             .union(YY)\
             .reduceByKey(lambda x,y: [x] + [y])\
             .map(GetImportance).filter(lambda x: len(x[1]) != 0 ).cache()
TT = ZZ.collect()

ZZ.unpersist()
YY.unpersist()

#NamingDict = "HashDictionary" + str(Node)
from collections import defaultdict

NamingDict2 = defaultdict(list)

for ii in TT:
    NamingDict2[ii[0]] = ii[1]

In [None]:
YYY.unpersist()

In [None]:
Node = 6
YYY = sc.broadcast(NamingDict2)
def MappingChangesWithDictionary(trainRDD):
    
    
    Dictionary = YYY.value
    
    """ Taking in all the Key/Value Components """
    FinalKey = trainRDD[0]
    
    
    
    Value = trainRDD[1]
    Value[Node] = Dictionary[Value[Node]]
    
    return (FinalKey,Value[Node])


Hashing_6 = NonNumericRDD.map(MappingChangesWithDictionary).cache()
Hashing_6.sample(False, 0.0000001, None).collect()

In [None]:
Hashing7 = Hashing_5.leftOuterJoin(Hashing_6).cache()
Hashing7.take(20)

In [None]:
Hashing_5.unpersist()
Hashing_6.unpersist()