Set up sparkcontext, add necessary credentials here for use on your platform of choice

In [None]:
from pyspark import SparkContext
import matplotlib.pyplot as plt

sc = SparkContext(appName="parallel_MofN")

'''

ADD REQUIRED SETUP FOR SPARK FOR YOUR CLOUD SERVICE IF NECESSARY

'''

Get necessary inputs to run the search. Need weights, input layer activations, output layer activations, and the target layer activations. All values and parameters are stored as 2d or 1d arrays in csv files. Convolutional layers are flattened if necessary. To load a layer to test change the respective paths to the location of the required file. More information on the proper format of arrays can be found in the readme

In [None]:
import pandas as pd
import numpy as np

weight_path="https://raw.githubusercontent.com/siodense5/parallel-MofN-search/blob/master/data/cars_fc_1_weights.csv"
input_path="https://raw.githubusercontent.com/siodense5/parallel-MofN-search/blob/master/data/processed_car_data.csv"
layerout_path="https://raw.githubusercontent.com/siodense5/parallel-MofN-search/blob/master/data/cars_fc_1_output.csv"
labels_path="https://raw.githubusercontent.com/siodense5/parallel-MofN-search/blob/master/data/cars_fc_3_output.csv"

w=pd.get_csv(weight_path).values
test_inputs=pd.get_csv(input_path).values
test_lo=pd.get_csv(layerout_path).values
test_labels=pd.get_csv(labels_path).values

Set up the test parameters. For each feature in a convolultional layer we select the neuron with the maximum information
gain to test. If the layer is fully connected feature size will be 1 and every neuron in test_lo will be tested.
We generate the output examples using the splits calculated in the previous step on the selected output neurons.
We then generate splits for the input neurons using the information gain with respect to the output neurons.
Since the split that maximizes the information gain for a given output neuron might not be the split that maximizes
the information gain for a different output neuron we generate a different set of splits for each output neuron.
The output of this is an array of binary output examples generated by the output splits and a list of arrays of
input examples generated by the input splits for each output neuron

In [None]:
import subsample_flatten
import generate_splits

#Change these parameters for each layer, for convolutional layers set subsample=True, for the final layer set Final_Layer=True
#If the layer is convolutional then set num_input_channels to the number of channels in the input and the feature dimension 
#to the size of dimension of the input (we assume that the input is 2 dimensional with the same number of features in each dimension)
Final_Layer=False
subsample=False

num_input_channels=32
feature_dimension=3
feature_size=feature_dimension*feature_dimension


num_hidden=w.shape[1]
num_visible=w.shape[0]
num_examples=test_inputs.shape[0]
num_labels=test_labels.shape[1]

if Final_Layer:
    
    for i in range(test_labels.shape[0]):
        
        #Convert to one-hot when extracting from the final layer
        test_labels[i,:]= (test_labels[i,:]==np.max(test_labels[i,:]))
        outputs=test_labels
        splits=generate_splits.generate_splits_parallel(test_inputs,outputs,2)[:,0,:]
        ins=[test_inputs]*num_hidden
        
else:
    
    test_labels=np.reshape(np.argmax(test_labels,axis=1),[num_examples,1])
    o=generate_splits_parallel(test_lo,test_labels,num_labels)


    #If the layer is convolution we must take subsamples of the input image to test on.
    #To do this, for each filter, we unflatten the input array, use information gain to 
    #select an image patch to test for each filter, for each example input, pad the image 
    #patch if necessary and flatten it. Return the set of example image patches in a 2d array

    if subsample:

        ou=np.array(o)

        best_output_splits=ou[:,0,0]
        information_gains=ou[:,1,0]

        indicies=[]

        #For each filter find the input image patch with maximum information gain. The index is the pixel at the center
        #of this image patch 
        for i in range(num_hidden):
            indicies.append(i*feature_size+np.argmax(information_gains[i*feature_size:i*feature_size+feature_size]))

        outputs=test_lo[:,indicies]>best_output_splits[indicies]

        splits=[]
        literals=[]

        ins=[]
        re_test_inputs=subsample_flatten.unflatten_inputs(test_inputs,[num_examples,feature_dimension,feature_dimension,num_input_channels])

        for i in range(len(indicies)):

            ins.append(subsample_flatten.subsample_flatten_arrays(re_test_inputs,[2,2],[2,2],num_input_channels,new_position=indicies[i]%feature_size))

            splits.append(generate_splits.generate_splits_parallel(ins[-1],np.reshape(outputs[:,i],[num_examples,1]),2)[:,0,0])

        splits=np.array(splits).T    


    #If the layer in not convolution we require no subsampling and generate splits for every input neuron.
    #Because the set of input neurons in this case does not depend on which hidden neuron we are using
    #we can run the search in parallel
    else:

        ou=np.array(o)

        best_output_splits=ou[:,0,0]
        information_gains=ou[:,1,0]

        outputs=test_lo>best_output_splits
        
        splits=generate_splits.generate_splits_parallel(test_inputs,outputs,2)[:,0,:]


Finally we convert the necessary data into lists in order to do perform parallel searches over them using lambda expressions. 
Because some of the examples might have a lot of hidden units we use test_hidden to select only a certain number of hidden units
to test. If we want to test all hidden units at once then set start_rule=0 end_rule=num_hidden

In [None]:
start_rule=0
end_rule=16
num_rules=end_rule-start_rule

test_hidden=[i for i in range(start_rule,end_rule)]

#In the previous step, if we were subsampling, ins contains num_hidden different input arrays,
#if we are not subsampling then each of these arrays is identical which is set up here.
if not subsample:

    ins=[test_inputs]*num_hidden

test_outputs=outputs[:,test_hidden]

weight_list=[w[:,h] for h in test_hidden]

test_ins=[ins[i] for i in test_hidden]


Set up the list of rules to test and the functions used to compute the loss of each rule. Functions computing the error by finding the coordinates of positive and negative literals in a rule. For each input, count the positive literals whose input is greater than the split at that coordinate and the negative literals whose input is less than the split at that coordinate. If the sum is at least M then the rule outputs True and false otherwise. The error is percentage of rule outputs that are different than the outputs generated by the network. 

The complexity is the normalized length of the DNF of the rule

In [None]:
#set up list of rules
sort_indicies = lambda w : [j[0] for j in sorted(enumerate(abs(w)), key=lambda x:x[1], reverse=True)]
sorted_indicies=[sort_indicies(w) for w in weight_list]
                                           
rule_bodies_M=[[N,M,i] for i in range(num_rules) for N in range(num_visible_test+1) for M in range(0,N+2)]  

#functions to compute the error of a rule positive_indicies returns input indicies corresponding to
#positive(unnegated) literals in the rule and negative_indicies returns the indicies corresponding to
#negative(negated) literals in the rule. test_rule_output takes a rule and a set of input examples, 
#counts the number of positive indicies in an example that are greater than their corresponding splits
#and the number of negative indicies less than their splits, adds the two and compares them to the value
#of M for the rule. If the value is greater than or equal to M then the output is 1 and 0 otherwise
positive_indicies = lambda rule: [sorted_indicies[rule[2]][i] for i in np.where(weight_list[rule[2]][sorted_indicies[rule[2]][0:rule[0]]]>0)[0].tolist()]
negative_indicies = lambda rule: [sorted_indicies[rule[2]][i] for i in np.where(weight_list[rule[2]][sorted_indicies[rule[2]][0:rule[0]]]<0)[0].tolist()]

compute_test_rule_output = lambda rule: np.count_nonzero((test_ins[rule[2]][:,positive_indicies(rule)]>=splits[positive_indicies(rule),rule[2]])==1,axis=1) +np.count_nonzero((test_ins[rule[2]][:,negative_indicies(rule)]<splits[negative_indicies(rule),rule[2]])==1,axis=1) >= rule[1]
compute_test_error = lambda test_rules_output, test_output: np.abs(test_rules_output-test_output).sum(0)/num_examples


#functions to compute the complexity of a rule. The unnormalized complexity of a rule is the length of
#its DNF which is M*(N choose M). Max complexity calculates the maximum of this value assuming n input
#neurons, compute_complexity calculates the length of the DNF and divides by max_complexity to normalize
#The logarithm is taken in the numerator and denominator to control for growth
N_choose_M=lambda N,M: (np.math.factorial(N)//(np.math.factorial(N-M)*np.math.factorial(M)))

max_complexity_M=np.ceil((num_visible+1)/2)
max_complexity=np.math.log(int(max_complexity_M)* N_choose_M(num_visible,max_complexity_M))
compute_complexity = lambda N,M : 0 if (M==0 or M > N) else np.math.log(int(M)*N_choose_M(N,M))/max_complexity 



Parallelize and run the search using the previously defined functions to compute the complexity and accuracy

In [None]:
rdd1=sc.parallelize(rule_bodies_M,5)

rdd2=rdd1.map(lambda rule: [rule, compute_test_error(compute_test_rule_output(rule),(test_outputs.T[rule[2]])), compute_complexity(rule[0],rule[1])])

ecrules=rdd2.collect()

Next we need to compute the loss using a set of complexity penalty values and return the rule with the best loss for each penalty value. In our implementation we use the following function adapted from
https://gist.github.com/Juanlu001/562d1ec55be970403442

In [None]:
"""Reduce by key.
Equivalent to the Spark counterpart
Inspired by http://stackoverflow.com/q/33648581/554319
1. Sort by key
2. Group by key yielding (key, grouper)
3. For each pair yield (key, reduce(func, last element of each grouper))
"""

def reduceByKey(func, iterable):

    get_first = lambda p: p[0]
    get_second = lambda p: p[1]
    # iterable.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce(func)))
    return map(
        lambda l: (l[0], reduce(func, map(get_second, l[1]))),
        groupby(sorted(iterable, key=get_first), get_first)
    )

In [None]:


#Change values in penalty_list to test loss functions of different weights, some trial and error might be required to find 
#a set of penalty values that shows the relationship between complexity and accuracy
penalty_list=[0, 0.1,0.2,1,5,1000]
best_rules=[]
for penalty in penalty_list:
    
    #compute the loss of each rule using a weighted sum of the error and complexity weighted by penalty, select the rule with minimum loss for each target neuron
    prules=list(map(lambda rule: (rule[0][2],[rule[0],rule[1],rule[2],rule[1]+penalty*rule[2]]),ecrules))
    prules=list(reduceByKey(lambda a,b : a if (a[3]<=b[3]) else b,prules))
    prules=[rule[1] for rule in prules]

    #append the set of optimal rules for the parameter penalty
    best_rules.append(prules)


complexity=[]
error=[]
#for each set of optimal rules given for a value of penalty, average the error and complexity of the optimal rules
for rules in best_rules:    
    average_error=0
    average_complexity=0     
    
    for rule in rules:
        average_error+=rule[1]
        average_complexity+=rule[2]
    
    #list the averge error and complexity for each optimal rule
    error.append(average_error/num_rules)
    complexity.append(average_complexity/num_rules)

print("Average Error/Average Complexity", error,complexity)

Plot the average error of the rules as a function of the average complexity. This allows us to see the relationship between how complex the rules are and how accurately they explain the neurons in the target layer. 

In [None]:
plt.plot(complexity,error)
plt.yaxis("average error")
plt.xaxis("average complexity")
plt.show()