In [1]:
import numpy as np
import requests
import time

from pyspark import SQLContext, SparkContext
from pyspark import SparkConf

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.linalg import Vectors

from sklearn.cluster import KMeans
from sklearn import metrics
from sklearn.model_selection import StratifiedKFold

from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors

In [2]:
# Load the dataset and labels
x=np.load('NBx.npy')
y=np.load('NBy.npy')

In [3]:
conf = SparkConf()
sc = SparkContext(conf = conf)
spark = SQLContext(sc)

In [4]:
# Calculate the rank of each feature
R=[]
for h in range(x.shape[1]):
    kmeans = KMeans(init='k-means++', n_clusters=np.unique(y).shape[0])
    ff=kmeans.fit_predict(x[:,h].reshape(-1,1))
    r=metrics.homogeneity_score(y,ff) #Use the homogeneity score as a rank of the feature
    R.append(r)

In [5]:
#Arrange feature accroding to thier ranks
Rnk=np.argsort(np.array(R))

In [6]:
#Initiate the cross-validation splitter
kfolds=StratifiedKFold(n_splits=5,shuffle=True)

In [None]:
#Per each set of ranks, use cross-validation to calculate accuracy.
smr=[]
et=0
for j in range(Rnk.shape[0]):
    fd=x[:,Rnk[j:]]
    pp=0
    lpa=np.zeros((0,2))
    for train,test in kfolds.split(fd,y):
        dff = map(lambda x: (int(float(x[-1])), Vectors.dense(x[:-1])),np.hstack((fd[train],y[train].reshape(-1,1))))
        TrD = spark.createDataFrame(dff,schema=["label", "features"]).rdd.map(lambda row: LabeledPoint(row.label, MLLibVectors.fromML(row.features)))
        dff = map(lambda x: (int(float(x[-1])), Vectors.dense(x[:-1])),np.hstack((fd[test],y[test].reshape(-1,1))))
        TsD = spark.createDataFrame(dff,schema=["label", "features"]).rdd.map(lambda row: LabeledPoint(row.label, MLLibVectors.fromML(row.features)))
        model = RandomForest.trainClassifier(TrD, numClasses=np.unique(y).shape[0],categoricalFeaturesInfo={},numTrees=100) #The number of classes in the dataset
        predictions = model.predict(TsD.map(lambda x: x.features))
        st = time.time()
        labelsAndPredictions = TsD.map(lambda lp: lp.label).zip(predictions)
        lpa=np.vstack((lpa,labelsAndPredictions.toDF().toPandas()))
        et+=time.time()-st
        acc = labelsAndPredictions.filter(lambda lp: lp[0] == lp[1]).count() / float(TsD.count())
        pp=pp+acc
    pp=pp/kfolds.n_splits
    np.savetxt('F%d.csv'%j,lpa,delimiter=',')
    smr.append([j, pp, et*1000000/x.shape[0]]) #Calculate the time requires to predict a label per each object in uS.

In [8]:
smr

[[78.0, 0.9254, 7.685],
 [77.0, 0.9258, 7.5517],
 [76.0, 0.9255, 7.7915],
 [75.0, 0.9254, 7.6001],
 [74.0, 0.9257, 7.6812],
 [73.0, 0.9256, 7.6089],
 [72.0, 0.9255, 7.6214],
 [71.0, 0.9257, 7.5825],
 [70.0, 0.9255, 7.5605],
 [69.0, 0.9254, 7.5314],
 [68.0, 0.9255, 7.5239],
 [67.0, 0.9256, 7.4595],
 [66.0, 0.9256, 7.5068],
 [65.0, 0.9253, 7.5029],
 [64.0, 0.9255, 7.5092],
 [63.0, 0.926, 7.4624],
 [62.0, 0.9255, 7.4734],
 [61.0, 0.9253, 7.4213],
 [60.0, 0.9253, 7.4582],
 [59.0, 0.9255, 7.562],
 [58.0, 0.9256, 7.3456],
 [57.0, 0.9262, 7.3502],
 [56.0, 0.9257, 7.3494],
 [55.0, 0.9257, 7.3937],
 [54.0, 0.9257, 7.31],
 [53.0, 0.9259, 7.2893],
 [52.0, 0.9258, 7.2689],
 [51.0, 0.9255, 7.2481],
 [50.0, 0.9259, 7.2697],
 [49.0, 0.9256, 7.2737],
 [48.0, 0.9257, 7.2836],
 [47.0, 0.9256, 7.205],
 [46.0, 0.9258, 7.2247],
 [45.0, 0.9254, 7.2173],
 [44.0, 0.9253, 7.1468],
 [43.0, 0.9256, 7.1579],
 [42.0, 0.9255, 7.1741],
 [41.0, 0.9255, 7.1775],
 [40.0, 0.9256, 7.1734],
 [39.0, 0.9259, 7.141],
 [38.0,