<a href="https://colab.research.google.com/github/ww6623/ABAGAIL/blob/master/ICA_Pyspark_ml_implementation(official).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Part 0: Set up Pyspark environment

In [29]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.0-preview2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install graphframes

spark-3.0.0-preview2-bin-hadoop3.2/
spark-3.0.0-preview2-bin-hadoop3.2/data/
spark-3.0.0-preview2-bin-hadoop3.2/data/streaming/
spark-3.0.0-preview2-bin-hadoop3.2/data/streaming/AFINN-111.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/sample_binary_classification_data.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/sample_kmeans_data.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/sample_multiclass_classification_data.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/sample_lda_libsvm_data.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/iris_libsvm.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/pagerank_data.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/sample_linear_regression_data.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/pic_data.txt
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/als/
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/als/test.data
spark-3.0.0-preview2-bin-hadoop3.2/data/mllib/als/sample_movielens_rati

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Part 1: Get sample data

In [4]:
# On droid the input to ml model is a pyspark dataframe, so i will need to transform the raw data into 
# install pydrive to load data
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# mount my google drive here to access the data
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [5]:
# read data of google drive, and convert it to pyspark dataframe
!ls "/content/drive/My Drive/MY_ica_pub_data"

citeseer.cites	citeseer.content


# Part 2: ICA implementation (*** IMPORTANT ***)

## File 0: All necessary libraries for model

In [6]:
''' External libraries'''
from collections import defaultdict # from file 1
import numpy as np  # from file 2

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score # all from file 4
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn import preprocessing
from pathlib import Path
from collections import defaultdict
import argparse, os
from pyspark.sql.types import *


from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import RFormula

## File 1: graph.py

In [7]:
'''
The difference between a dict and a defaultdict is that
the defaultdict object will "default" a value if that key has
not been seen yet.  If I use a normal dict, I have to check to
see if that key exists, and if it doesn't, set it to what i want.
'''
class Node(object):
    def __init__(self, node_id, feature_vector = None, label = None):
        '''
        self.node_id: string
            Unique id of a node.

        self.feature_vector(X): list
            A list of feature.

        self.label(Y): string
            Corresponding Y label.
        '''
        self.node_id = node_id # unique identifier of a row
        self.feature_vector = feature_vector
        self.label = label


class Edge(object):
    def __init__(self, from_node, to_node, feature_vector = None, label = None):
        '''
        self.from_node: node object
            Node object where the edge connection starts

        self.to_node: node object
            Node where the edge connection ends or where the arrow points to

        self.feature_vector(optional): list
            Feature info of connected node, default = None

        self.label(optional): string
            Labels of the connected node, default = None
        '''
        self.from_node = from_node
        self.to_node = to_node
        self.feature_vector = feature_vector
        self.label = label


class Graph(object): # this is a generic graph object
    '''The base Graph class'''
    def __init__(self):
        '''
        self.node_list: list
            A list of all node objects in the graph

        self.edge_list: list
            A list of all edge objects in the graph
        '''
        self.node_list = []
        self.edge_list = []

    def add_node(self, node_object):
        '''Add a node object to node_list attribute '''
        self.node_list.append(node_object)

    def add_edge(self, edge_object):
        ''' will be overwritten by child class '''
        raise NotImplementedError

    def get_neighbors(self, node_object):
        ''' will be overwritten by child class '''
        raise NotImplementedError


class DirectedGraph(Graph):
    '''DirectedGraph class inherited from Graph '''
    def __init__(self):
        super(DirectedGraph, self).__init__()
        '''
        self.out_neighbors: defaultdict
            Store outward neighbor node object of all nodes

        self.in_neighbors: defaultdict
            Store inward neighbor node object of all nodes
        '''
        self.out_neighbors = defaultdict(set)  # there should be no repeated neighbor objects, so use set
        self.in_neighbors = defaultdict(set)

    def add_edge(self, edge_object):
        '''
        Add new edge object into the edge list
        Update out_neighbors dict, use edge's from_node as key, add to_node to it's value set
        Update in_neighbors dict, use edge's to_node as key, add from_node to it's value set
        '''
        self.edge_list.append(edge_object)
        self.out_neighbors[edge_object.from_node].add(edge_object.to_node) # to_node is the out_neighbor of from node
        self.in_neighbors[edge_object.to_node].add(edge_object.from_node) # from_node is the in_neighbor of to_node

    def get_out_neighbors(self, node_object):
        ''' Return outward neighbors of a node '''
        return self.out_neighbors[node_object]

    def get_in_neighbors(self, node_object):
        ''' Return inward neighbors of a node '''
        return self.in_neighbors[node_object]

    def get_neighbors(self, node_object):
        ''' Return both outward and inward neighbors of a node '''
        return self.out_neighbors[node_object].union(self.in_neighbors[node_object])

class UndirectedGraph(Graph): # undirected Graph
    '''UndirectedGraph class inherited from Graph '''
    def __init__(self):
        super(UndirectedGraph, self).__init__()
        '''
        self.neighbors: defaultdict
            Store all neighbor node objects of nodes
        '''
        self.neighbors = defaultdict(set)

    def add_edge(self, edge_object):
        '''
        Add edge object to edge_list
        undirected graph use 2 sided edge, both nodes will be added
        to their respective key value pairs
        '''

        self.edge_list.append(edge_object)
        self.neighbors[edge_object.from_node].add(edge_object.to_node)
        self.neighbors[edge_object.to_node].add(edge_object.from_node)

    def get_neighbors(self, node_object): #input is the node
        '''Return neighbor node objects of a given node '''
        return self.neighbors[node_object]


## File 2: aggregator.py 

In [16]:
class Aggregator(object):

    def __init__(self, domain_labels, directed = False):
        '''
        self.domain_labels: list
            A list of unique labels

        self.directed: boolean
            Whether its directed or undirected graph,
            default: False (无向图)
        '''
        self.domain_labels = domain_labels 
        self.directed = directed 

    def aggregate(self, graph, node, conditional_node_to_label_map):
        '''Overwritten in the child class '''
        raise NotImplementedError


class CountAggregator(Aggregator):
    '''The count aggregate, inherited from Aggregator'''
    def aggregate(self, graph, node, conditional_node_to_label_map):
        '''
        Given a node, its graph structure, and the temp {node: label} mapping
        create and return a relational feature vector for neighbors of this node.
        If a neighbor is not in conditional_node_to_label_map, ignore it.
        If directed = True, create and append two feature vectors;
        one for the out-neighbors and one for the in-neighbors.
        Otherwise only one feature vector is needed
        it returns a count feature vector
        '''
        neighbor_undirected = []
        neighbor_directed_in = []
        neighbor_directed_out = []
        if self.directed: # we have a directed graph
            for x in self.domain_labels: # initiating the count for each labels
                neighbor_directed_in.append(0.0)
                neighbor_directed_out.append(0.0)
            for eachIn in graph.get_in_neighbors(node): # for each in_neighbor of this node
                if eachIn in conditional_node_to_label_map: # check if this neighbor is in the training set node
                    index = self.domain_labels.index(conditional_node_to_label_map[eachIn]) # this will return the index position of an element
                    '''reason why use  conditional_node_to_label_map here is because
                       it's a temporary mapping of {node: label}, because label might be fake label
                       Note: The index() method returns the position at the first occurrence of the specified value.
                       '''
                    neighbor_directed_in[index] += 1.0 # increment on the label's index position
            for eachOut in graph.get_out_neighbors(node): # same for out_neighbor
                if eachOut in conditional_node_to_label_map:
                    index = self.domain_labels.index(conditional_node_to_label_map[eachOut])
                    neighbor_directed_out[index] += 1.0 # increment the count of label's on out neighbor
            return neighbor_directed_in+neighbor_directed_out   #                             INa, INb, OUTa, OUTb
                                                                # count for all neighbor, ex: [1,   2,   2,     0]
        else: # if it's an undirected graph
            for x in self.domain_labels:  # similar to  before, start by initiating count for each label
                neighbor_undirected.append(0.0)
                '''换一种写法 '''
            for i in graph.get_neighbors(node):
                if i in conditional_node_to_label_map.keys(): # if it matches the node object
                    index = self.domain_labels.index(conditional_node_to_label_map[i]) # find it's index
                    neighbor_undirected[index] += 1.0 # increment count
            return neighbor_undirected


class ProportionalAggregator(Aggregator):
    '''The proportional aggregate'''

    def aggregate(self, graph, node, conditional_node_to_label_map):
        '''
        This method aggregate feature vector use it's percentage, it will first use
        the count aggregator to compute count of each neighbor node, then if self.directed
        is True, it will compute all the in_neighbor nodes percentage and out_neighbor
        percentage respectively, in neighbor example: (in_neighbor_count(i) =in_neighbor_count(i)/ sum(in_neighbor_count(n)))
        it will return a percentage feature vector
        '''
        cntag = CountAggregator(self.domain_labels,self.directed)
        cnt_agg = cntag.aggregate(graph,node,conditional_node_to_label_map) # [1,2,3,4], get count vector for all neighbor
        if self.directed: # if a directed graph
            in_neighbor_sum=sum(cnt_agg[:len(self.domain_labels)])  # in neighbot count total
            out_neighbor_sum=sum(cnt_agg[len(self.domain_labels):]) # out neighbor count total
            if in_neighbor_sum > 0: # if the sum greater than 0
                for r in range(0,len(self.domain_labels)): #
                    cnt_agg[r]/=in_neighbor_sum  # update on count vector
            if out_neighbor_sum > 0:
                for r in range(len(self.domain_labels),len(cnt_agg)):
                    cnt_agg[r]/=out_neighbor_sum # update on count vector
            p_list = cnt_agg  # now all position of cnt_agg vector has been updated
            return p_list   #                             INa, INb, OUTa, OUTb
                            # count for all neighbor, ex: [1/3,  2/3,  1,   0]  # in fraction
        else: # now if it's an undirected graph
            total_sum = sum(cnt_agg) # sum of the entire list
            if total_sum > 0:  # if over 0
                for r in range(len(cnt_agg)):
                    cnt_agg[r] /= total_sum
            p_list = cnt_agg
            return p_list

class ExistAggregator(Aggregator):
    '''The exist aggregate inherited from the aggregator class'''
    def aggregate(self, graph, node, conditional_node_to_label_map):
        ''' This method utilize the Count aggregator, check each position of the count vector
        using the CountAggregator, if any element is at least 1, upde the value to 1
        meaning there exist at least 1 neighbor, if count is 0, then do nothing.
        it returns a boolean feature vector
        '''
        cntag=CountAggregator(self.domain_labels,self.directed)
        cnt_agg = cntag.aggregate(graph,node,conditional_node_to_label_map)
        for r in range(len(cnt_agg)):
            if cnt_agg[r] >= 1: #if count is greater or equal to 1, then set the cell to 1
                cnt_agg[r] = 1
        ext_list = cnt_agg
        return ext_list


## File 3: classifier.py******* (convert to pyspark ml with pipeline)

In [77]:
# utility function
def get_class(classifier_name):
    parts = classifier_name.split('.')
    module = ".".join(parts[:-1])
    md = __import__( module )
    for comp in parts[1:]:
        md = getattr(md, comp)
    return md  # skleanr input: 'sklearn.ensemble.RandomForestClassifier', output: sklearn.linear_model._logistic.LogisticRegression
               # pyspark ml rf input: "pyspark.ml.classification.RandomForestClassifier"
               # pyspark ml logistic regression input: "pyspark.ml.classification.LogisticRegression"



# util for fit method
# input: list of lists (features) and list(label)
# output: pyspark df, with 2 columns dense feature_vector(features) and (label)
def training_data_converter_features_and_label(feature_list_of_lists, label_list):
    np_sample_features = np.array(feature_list_of_lists)
    transpose_2d_np_lable = np.array([label_list]).T
    combined_np = np.hstack((np_sample_features, transpose_2d_np_lable))
    np_mapper = map(lambda x: (Vectors.dense(x[0:-1]), float(x[-1])), combined_np)
    features_label_df = spark.createDataFrame(np_mapper, schema = ["features", "label"])
    
    return features_label_df # return a spark dataframe


# util for predict method
def testing_data_converter_features_only(feature_list_of_lists):
    np_sample_features = np.array(feature_list_of_lists)
    # create mapper for features only
    np_mapper = map(lambda x: (Vectors.dense(x[:]), ), np_sample_features)
    features_only_df = spark.createDataFrame(np_mapper, schema = ["features"])
    return features_only_df

# convert prediction spark df to prediction 1d np array
def converter_prediction_np(spark_prediction):
    np_prediction = np.array(spark_prediction.select("prediction").collect()).flatten()
    return np_prediction


class Classifier(object):
    '''
    The base classifier for Iterative Classification.

    Warning: This class should not be used directly, use derived classes
    instead.
    '''

    def __init__(self, pyspark_ml_classifier_name, **classifier_args):
        '''
        self.clf: classifier object
            A classifier object with passed in arguments
        '''
        classifier_class=get_class(pyspark_ml_classifier_name) # get an pyspark.ml classifier
        self.clf = classifier_class(**classifier_args)     # initialize this classifier with arguments

    def fit(self, graph, train_indices):
        '''
        fit method will be overwritten by child class
        '''
        raise NotImplementedError

    def predict(self, graph, test_indices, conditional_node_to_label_map = None):
        '''
        predict method will be overwritten by child class
        '''
        raise NotImplementedError


class LocalClassifier(Classifier):
    '''
    1st classifier inherits from base classifier
    '''
    def fit(self, graph, train_indices):
        '''
        train with training nodes' original feature vector and ground truth y-labels

        Parameters
        ----------
        graph: graph object
            contains all the node and edge objects
        training_indices: list
            A list of training indices, can be used to query node object
            from graph's node_list attribute
        '''
        feature_list= []  # to store all the feature, this is a list of lists
        label_list=[]  # corresponding y labels
        g= graph   # graph object
        n= g.node_list # all node objects in this graph
        training_nodes=[n[i] for i in train_indices] # reference train_indices and store all training nodes

        for node in training_nodes: # get through each training nodes
            feature_list.append(node.feature_vector) # IS THIS A Mapper object?? append each feature vector,
            label_list.append(node.label)

        ''' CHANGE_1: Starts from here i need to convert this using pyspark ml '''
        # transform features and label to spark_df
        features_label_df = training_data_converter_features_and_label(feature_list, label_list)

        # print("this is what local classifier training data look like")
        # features_label_df.show()
        # print("what is self clf before training? {}".format(self.clf))
        self.clf = self.clf.fit(features_label_df) # call fit method and update the classifier attribute
            # print("what is self clf after training? {}".format(self.clf))

    def predict(self, graph, test_indices, conditional_node_to_label_map = None):
        '''
        Warning: this function should be called only after the fit function is called!
        Predict the labels of test Nodes conditioning on the labels in
        conditional_node_to_label_map (stores temp label of a node).

        Parameters
        ----------
        graph: graph object
            contains all the nodes and edges object
        test_indices: list
            A list of testing indices, can be used to query node object
            from graph's node_list attribute

        Returns
        -------
        y: ndarray of shape (n_samples,) or (n_samples, n_outputs)
            The predicted classes
        '''
        feature_list=[]  # testing set only have features
        g= graph # again get the graph object
        n=g.node_list # get all nodes object in the graph
        testing_nodes = [n[i] for i in test_indices] # only select the ones for testing

        for node in testing_nodes: # iterate through the testing_node
            feature_list.append(node.feature_vector) # update feature list or matrix

        ''' ******CHANGE_3: convert this use pyspark ml ***** '''
        spark_df_feature_only = testing_data_converter_features_only(feature_list)
        # use transform method to get a prediction dataframe
        prediction_df = self.clf.transform(spark_df_feature_only) 
        # conver prediction df to 1d np array
        prediction_np = converter_prediction_np(prediction_df)

        return prediction_np


class RelationalClassifier(Classifier):
    '''
    2nd classifier inherits from base classifier
    Use super method here to initialize a different classifier
    '''
    def __init__(self, pyspark_ml_classifier_name, aggregator, use_node_attributes = True, **classifier_args):
        super(RelationalClassifier, self).__init__(pyspark_ml_classifier_name, **classifier_args) # this will overwites the original paramters
        '''
        self.aggregator: Aggregator object
            An aggregator object that creates relational feature

        self.use_node_attributes: Boolean
            A boolean value that indicates whether to use node's original feature vector
        '''
        self.aggregator = aggregator # the aggregator of my choice
        self.use_node_attributes = use_node_attributes # boolean on whether node attribute is used (default = True)


    def fit(self, graph, train_indices):
        '''
        Conditionally combine training nodes' feature vector with relational feature vector. Train the
        2nd classifier with updated feature vector and ground truth label

        Parameters
        ----------
        graph: graph object
            contains all the node and edge objects
        training_indices: list
            A list of training indices, can be used to query node object
            from graph's node_list attribute
        '''
        conditional_map={} # this conditional map is just for training node
        features=[] # list of lists (or matrix), this is the new feature
        label_list=[] # labels list
        for i in train_indices: # create mapping for all training nodes
            conditional_map[graph.node_list[i]]=graph.node_list[i].label
        for i in train_indices: # iterate through every
            self.feature_combination_check(graph,features,i,conditional_map) # append [feature_vector + relational_feature] to features
            label_list.append(graph.node_list[i].label)  # add every training node's label to labels


        '''CHANGE 2: convert features and label list to df'''
        relational_train_features_label_df = training_data_converter_features_and_label(features, label_list)
        self.clf = self.clf.fit(relational_train_features_label_df) # update self.clf as a trained classifier
        
        # print("relational classifer fit method is working")

    def predict(self, graph, test_indices, conditional_node_to_label_map = None):
        '''
        Warning: this method should be called only after the fit method is called!
        Predict the labels of test Nodes conditioning on the labels in conditional_node_to_label_map.
        conditional_node_to_label_map might include the observed and predicted labels.
        This method is NOT iterative; it does NOT update conditional_node_to_label_map.

        Parameters
        ----------
        graph: graph object
            contains all the nodes and edges object
        test_indices: list
            A list of testing indices, can be used to query node object
            from graph's node_list attribute
        conditional_node_to_label_map: dictionary
            A dictionary that contains TEMPORARY mapping of given node objects and their current labels

        Returns
        -------
        y: ndarray of shape (n_samples,) or (n_samples, n_outputs)
            The predicted classes
        '''
        features=[]
        for i in test_indices: # go over each test_indices of the graph object's node list
            self.feature_combination_check(graph,features,i,conditional_node_to_label_map) #append [feature_vector + relational_feature] to features

        # convert features (list of lists) to spark df
        spark_df_feature_only = testing_data_converter_features_only(features)
        # run transform method to get prediction df
        prediction_df = self.clf.transform(spark_df_feature_only)
        # transform prediction_df to 1d np array
        prediction_np = converter_prediction_np(prediction_df)

        return prediction_np # make predictions based on test feature


    def feature_combination_check(self,graph,features,i,conditional_map):
        '''
        helper method, conditionally creates relational feature for a node, and add to the features list

        Parameters
        ----------
        graph: graph object
            contains all the nodes and edges object
        features: list
            Feature list to contain updated feature vectors
        i: integer
            index to query corresponding node object from graph object
        conditional_node_to_label_map: dictionary
            A dictionary that contains TEMPORARY mapping of given node objects and their current labels
        '''
        aggregates=self.aggregator.aggregate(graph,graph.node_list[i],conditional_map)# this will give an array of relational features of one node
        feat_list=np.array([]) # initialize an empty feature vector
        if self.use_node_attributes:  # if node's feature is included
            node_feature_vector = graph.node_list[i].feature_vector
            # print("current node's feature vector's length is {}".format(len(node_feature_vector)))
            feat_list=np.append(node_feature_vector,aggregates) #np append is different than array, it essently append both list to arr
            # print("final feat_list's length is {}, it's type is {}".format(len(feat_list), type(feat_list)))
        else:
            feat_list=np.append(feat_list.tolist(),aggregates)
        features.append(feat_list)


class ICA(Classifier):
    '''
    Iterative Classification Algorithm
    '''
    def __init__(self, local_classifier, relational_classifier, max_iteration = 10):
        '''
        self.local_classifier: classifier object
            1st classifier object, also known as LocalClassifier object

        self.relational_classifier: classifier object
            2nd classifier object, also known as RelationalClassifier object

        self.max_iteration: int
            maximum number of iterations for termination, default is 10
        '''
        self.local_classifier = local_classifier
        self.relational_classifier = relational_classifier
        self.max_iteration = max_iteration  # this is set for convergence

    def fit(self, graph, train_indices):
        # might need to update documentation
        '''
        ICA training phase:
        local_classifier: Training data => train with original feature_vector and ground truth label
        relational_classifier: Training data = > train with [feature_vector + relaitonal_vector] and ground truth label

        Parameters
        ----------
        graph: graph object
            contains all the nodes and edges object
        training_indices: list
            A list of training indices, can be used to query node object
            from graph's node_list attribute
        '''
        self.local_classifier.fit(graph, train_indices)
        self.relational_classifier.fit(graph, train_indices)

    def predict(self, graph, test_indices, conditional_node_to_label_map = None):
        '''
        Warning: This function should be called only after the fit function is called!
        Step_1: predict test nodes use trained local_classifer to get temp(fake) labels.
        Use temp labels to update the conditional_node_to_label_map on test nodes
        Step_2: Begin Iterative phase, set a termination iteration number and start the (outer loop)
        step_3: Inner loop, iterate through each test node, predict it's label with updated feature vector
        use relational_clf, again use the predicted label to update conditional_node_to_label_map
        step_4: Once exceed the self.max_iteration, exit outter loop and record labels in
        conditional_node_to_label_map for all test nodes

        Parameters
        ----------
        graph: graph object
            contains all the nodes and edges object
        test_indices: list
            A list of testing indices, can be used to query node object
            from graph's node_list attribute
        conditional_node_to_label_map: dictionary
            A dictionary that contains TEMPORARY mapping of given node objects and their current labels

        Returns
        -------
        y: list
            The predicted labels
        '''
        # refer to notes for each step
        predictclf=self.local_classifier.predict(graph,test_indices) # **** Step_2: This is an array, get fake labels for test set
        # predictclf is a ndarray, looks like ['DB', 'HCI'... 'ML']
        self.cond_mp_upd(graph,conditional_node_to_label_map,predictclf,test_indices) # this mapping already comes with mappings for training nodes
                                                                                      # update conditional_map's test indices with fake labels
                                                                                      # Ex: {test_node_object: predicted_temp_label}
        relation_predict=[] # store prediction
        temp=[] # temp to hold training index,

        for eachTrail in range(self.max_iteration): # iterate max_iteration of times

            for x in test_indices: # this has to be done node by node, because it needs to update the relational mapping
                temp.append(x) # store test indices
                rltn_pred=list(self.relational_classifier.predict(graph,temp,conditional_node_to_label_map)) # ***Step3+ Step3b: predict one test node with updated fake label                                                                                             # (feature_vector + relational_vector)
                self.cond_mp_upd(graph,conditional_node_to_label_map,rltn_pred,temp) # update one node mapping {test-object: new predicted-y}, based on the newly predicted rltn_pred
                temp.remove(x) # get rid of this this index

        '''Once the outter loop complete, we get the final predicted y-label for all test node '''
        for ti in test_indices:  # iterate through all test node index
            relation_predict.append(conditional_node_to_label_map[graph.node_list[ti]])  #quey conditional_map[test_node_obj] => to get the final label
        return relation_predict # this is a list of labels
        '''This is the final predicted labels for all test nodes, Done '''

    def cond_mp_upd(self,graph,conditional_map,pred,indices): # stores all {test_node_object_A: label_A, }
        '''
        helper method: On conditional_node_to_label_map, update given nodes' label with predicted labels

        Parameters
        ----------
        graph: graph object
            contains all the nodes and edges object
        conditional_map: dictionary
            A dictionary that contains TEMPORARY mapping of given node objects and their current labels
        pred: ndarray
            The predicted labels
        indices: list
            A list of node indices, can be used to query node object from graph's node_list attribute
        '''
        for x in range(len(pred)):# iteration number depends on size of the prediction list
            conditional_map[graph.node_list[indices[x]]]=pred[x]  # its a dict with key size is test set
                                                                  # the mapping looks like this
                                                                  # {test_node_object: predicted_fake_label}


## File 4: utils.py (This is a very import first step, to store the dataframe into a graph object)

In [78]:

'''Internal libraries, no need to import '''
# from graph import DirectedGraph, UndirectedGraph, Node, Edge
# from aggregator import CountAggregator, ProportionalAggregator, ExistAggregator



'''atom util_1, get class, moved to classifier.py'''


'''atom util_2 '''
def pick_aggregator(agg,domain_labels,directed):
    if agg=='count':
        aggregator=CountAggregator(domain_labels,directed)
    elif agg=='prop':
        aggregator=ProportionalAggregator(domain_labels,directed)
    elif agg=='exist':
        aggregator=ExistAggregator(domain_labels,directed)
    else:
        raise ValueError('Invalid argument')

    return aggregator

'''atom util_3 '''
def create_map(graph,train_indices):
    conditional_map={}
    for i in train_indices:
        conditional_map[graph.node_list[i]]=graph.node_list[i].label
    return conditional_map



'''IGNORE THIS STEP FOR DROID IMPLEMENTATION '''
def load_data_and_build_spark_df(content_file):
    # convert this file to a spark dataframe
    sc = spark.sparkContext
    all_lines = []
    with open(content_file, 'r') as node_file:
        for line in node_file:
            line_info = line.split('\n')[0].split('\t') # line looks like this ['454077', '0', '0',....'DB']
            # print(line_info), successfully load
            all_lines.append(line_info)
        df = sc.parallelize(all_lines).toDF() #when exit the for loop, convert all_lines to spark df
    return df 


'''
IMPORTANT***, I can insert the Droid Registration Spark Dataframe here along with edge connection file 
to generate the graph data structure
'''
def build_graph_with_DF(df, cites_file):
    # input: dataframe, with id + features + label

    undirected_device_graph = UndirectedGraph()  # initiate a direct graph object
    unique_labels=[]             # unique labels
    id_obj_map={}

   
    ''' 
    Collect (Action) - Return all the elements of the dataset as an array at the driver program. 
    This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
    '''
    string_label_to_int_dictionary = {"Agents":0, "IR":1, "DB":2, "AI":3, "HCI":4, "ML": 5}
    for row in df.rdd.collect(): # iterate through each row of the dataframe
        '''
            if not use mapper, this will return tuple of strings
            for droid implementation I want to return a list of floats?
        '''

        '''change 1 '''
        label_int = string_label_to_int_dictionary[row[-1]] 
        n = Node(row[0], list(map(float,row[1:-1])), label_int) # every row creates a new node object,
        undirected_device_graph.add_node(n)   # add new node object into the graph object
        if label_int not in unique_labels:    # if row's label does not exist
            unique_labels.append(label_int)   # create a list of unique label
        id_obj_map[row[0]] = n              # {id: node_object} reference table

    # ******now get all the edge connections for the undirected graph*******
    with open(cites_file,'r') as edge_file:  # read the edge files
        for line in edge_file:
            line_info=line.split('\n')[0].split('\t') # this is edge connection, line_info: ['100157', '100157'], a list of strings, for Droid it will 
                                                      # list of tuples [('email','ip','device_id'), ('email','ip','device_id')]
            if line_info[0] in id_obj_map.keys() and line_info[1] in id_obj_map.keys(): # if both ids are in id_maps, updated undirected graph
                # print("true")
                from_node=id_obj_map[line_info[1]]  # store from node object
                to_node=id_obj_map[line_info[0]]  # store to node object
                undirected_device_graph.add_edge(Edge(from_node, to_node)) # store all edge objects in the graph

    return undirected_device_graph, unique_labels

## File 5: run_experiment.py

In [79]:
# parser = argparse.ArgumentParser()
# parser.add_argument('-content_file', type = Path, default = "/content/drive/My Drive/MY_ica_pub_data/citeseer.content", help = 'The path to the content file.')
# parser.add_argument('-cites_file', type = Path, default = "/content/drive/My Drive/MY_ica_pub_data/citeseer.cites",help='The path to the cites file.')
# args = parser.parse_args()
df = load_data_and_build_spark_df("/content/drive/My Drive/MY_ica_pub_data/citeseer.content")
df.show()

+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+

In [80]:
# show size of the df
print("The num of rows for this df is {}".format(df.count())) # this is same is num of nodes
print("The num of columns for this df is {}".format(len(df.columns))) # this number -2 = feature_number

The num of rows for this df is 3312
The num of columns for this df is 3705


In [81]:
# now I stored pyspark dataframe in a graph object
graph, domain_labels = build_graph_with_DF(df,"/content/drive/My Drive/MY_ica_pub_data/citeseer.cites") 

In [82]:
# peek first node's id, feature and LABEL
print("first node's id is {}".format(graph.node_list[0].node_id))
print("first node's feature is {}".format(graph.node_list[0].feature_vector))
print("first node's label is {}".format(graph.node_list[0].label))

first node's id is 100157
first node's feature is [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,

In [83]:
print ("domain labels look like {}".format(domain_labels))
print("how many nodes total? {}".format(len(graph.node_list)))
print("original nodes feature_vector length is {}".format(len(graph.node_list[0].feature_vector)))

domain labels look like [0, 1, 2, 3, 4, 5]
how many nodes total? 3312
original nodes feature_vector length is 3703


# Original non-pipeline version

In [84]:
'''Same output from atom '''


# *******n is the index list for all nodes object*****
n=range(len(graph.node_list))
# ****************************
ica_accuracies = defaultdict(list)   # use accuracy to measure predicted and ground truth
 
train, test = train_test_split(n, train_size=0.8, random_state=0) 

y_true=[graph.node_list[t].label for t in test] # obtain true label for test nodes

#'pyspark.ml.classification.LogisticRegression'
local_clf=LocalClassifier('pyspark.ml.classification.RandomForestClassifier', numTrees = 10, labelCol = "label") # initialize local_classifer with


# Get aggregator
agg=pick_aggregator('exist',domain_labels,False) # pick my aggregator, here False means undirected
relational_clf=RelationalClassifier('pyspark.ml.classification.LogisticRegression', agg, True, maxIter = 10, labelCol = "label") # initialize relational classifier, questionable input
ica=ICA(local_clf,relational_clf) 
ica.fit(graph,train) 

# print("what is the feature importance for relational classifier use logistic regression? {}".format(ica.relational_classifier.clf.featureImportances))



'''run later, transform does not work '''
conditional_node_to_label_map=create_map(graph,train) 
ica_predict=ica.predict(graph,test,conditional_node_to_label_map) 
ica_accuracy=accuracy_score(y_true,ica_predict) 
print("accuracy is {}".format(ica_accuracy))


'''for classification problem , evaluation use confusion matrix '''

accuracy is 0.7375565610859729


'for classification problem , evaluation use confusion matrix '

# Implement a pipeline version of the algorithm (Use this one for the actual data)