In [1]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id 
from math import sqrt
from pyspark.sql.functions import sqrt

# Data 
train_data_labels = 'Train-label-28x28.csv'
test_data_labels = 'Test-label-28x28.csv'

def get_vector(data, col_name):
    assembler = VectorAssembler(inputCols=data.columns, outputCol=col_name)
    return assembler.transform(data).select(col_name)    

# Train Data
train = spark.read.csv(train_data_labels, header=False, inferSchema="true")
train_labels = get_vector(train.select('_c0'), 'train_label')
train_features = get_vector(train.drop('_c0'), 'feature')

# Test Data
test = spark.read.csv(test_data_labels, header=False, inferSchema="true")
test_labels = get_vector(test.select('_c0'), 'test_label')
test_features = get_vector(test.drop('_c0'), 'feature')

In [2]:
from pyspark.ml.feature import PCA

pca = PCA(k=50, inputCol="feature", outputCol="pca_feature")
pca_model = pca.fit(train_features)

# Apply PCA to train / test features
train_features_pca = pca_model.transform(train_features).select("pca_feature")
test_features_pca = pca_model.transform(test_features).select("pca_feature")

In [3]:
# Rename pca feature column values
train_features_pca = train_features_pca.withColumnRenamed("pca_feature", "train_feature")
test_features_pca = test_features_pca.withColumnRenamed("pca_feature", "test_feature")

In [4]:
# Develop a combined dataframe for all data

def combine_features_labels(feature_vector, label_vector, kind='train'):
    features = feature_vector.withColumn('{}_id'.format(kind), monotonically_increasing_id())
    labels = label_vector.withColumn('{}_id'.format(kind), monotonically_increasing_id())
    data = features.join(labels, '{}_id'.format(kind))
    return data

# Create combined train / test data
train_data = combine_features_labels(train_features_pca, train_labels, 'train')
test_data = combine_features_labels(test_features_pca, test_labels, 'test')

In [5]:
train_data.show(4), test_data.show(4), train_data.count(), test_data.count()

+--------+--------------------+-----------+
|train_id|       train_feature|train_label|
+--------+--------------------+-----------+
|       0|[880.731433034386...|      [5.0]|
|       1|[1768.51722024166...|      [0.0]|
|       2|[704.949236329314...|      [4.0]|
|       3|[-42.328192193772...|      [1.0]|
+--------+--------------------+-----------+
only showing top 4 rows

+-------+--------------------+----------+
|test_id|        test_feature|test_label|
+-------+--------------------+----------+
|      0|[424.527675108320...|     [7.0]|
|      1|[777.495806467954...|     [2.0]|
|      2|[-189.22802355912...|     [1.0]|
|      3|[1990.70583089721...|     [0.0]|
+-------+--------------------+----------+
only showing top 4 rows



(None, None, 60000, 10000)

In [8]:
# broadcast training features and examples
train_feature = sc.broadcast(train_data.rdd.map(lambda x: x.train_feature).collect())
train_label = sc.broadcast(train_data.rdd.map(lambda x: x.train_label).collect())

In [12]:
%%time

# WORKS IN < 1 MIN BUT TOO MUCH NUMPY

import numpy as np
import operator
import collections
from pyspark.mllib.linalg import Vectors

def make_prediction(test_feature, train_feature, train_lables, K=3):
    similarity = ((test_feature - train_feature) ** 2).sum(axis=1) ** 0.5
    k_neighbors = train_lables[np.argpartition(similarity, K)[:K]]
    predicted_label = collections.Counter(k_neighbors.ravel()).most_common()[0][0]
    return Vectors.dense(predicted_label)

# Calulate components
lables = np.array(train_label.value)
features = np.array(train_feature.value)

# Compute KNN predictions
predictions = test_data.rdd.map(
    lambda x: (x.test_id, make_prediction(np.array(x.test_feature), features, lables, K=5))).collect()

# Create dataframe of prediction resutlts
prediction_data = sc.parallelize(predictions).toDF(['test_id', 'prediction'])

CPU times: user 2.06 s, sys: 300 ms, total: 2.36 s
Wall time: 1min 17s


In [13]:
# (6000, 1000) => 1.59 s ± 49.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# (60000, 10000) => 1min 13s ± 1.33 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [14]:
prediction_data.show(3)

+-------+----------+
|test_id|prediction|
+-------+----------+
|      0|     [7.0]|
|      1|     [2.0]|
|      2|     [1.0]|
+-------+----------+
only showing top 3 rows



In [15]:
from pyspark.mllib.evaluation import MulticlassMetrics

def prepare_data(actual_data, prediction_data):
    return actual_data.join(prediction_data, 'test_id').rdd \
                        .map(lambda x: (float(x.prediction[0]), float(x.test_label[0])))

def overall_report(actual_data, prediction_data):
    prediction_and_labels = prepare_data(actual_data, prediction_data)
    metrics = MulticlassMetrics(prediction_and_labels)
    return sc.parallelize([(Vectors.dense(round(metrics.precision(), 3)), 
                            Vectors.dense(round(metrics.recall(), 3)), 
                            Vectors.dense(round(metrics.fMeasure(), 3)))]).toDF(['Precision', 'Recall', 'F-Score'])

def classification_report(actual_data, prediction_data):
    prediction_and_labels = prepare_data(actual_data, prediction_data)
    metrics = MulticlassMetrics(prediction_and_labels)
    classes = set(actual_data.rdd.map(lambda x: x.test_label[0]).collect())
    results = [(Vectors.dense(float(c)),
                Vectors.dense(round(metrics.precision(c), 3)), 
                Vectors.dense(round(metrics.recall(c), 3)), 
                Vectors.dense(round(metrics.fMeasure(c), 3))) for c in sorted(classes)]
    return sc.parallelize(results).toDF(['Class', 'Precision', 'Recall', 'F-Score'])

In [16]:
%%time

overall_metrics = overall_report(test_data, prediction_data)
overall_metrics.show()

+---------+-------+-------+
|Precision| Recall|F-Score|
+---------+-------+-------+
|  [0.976]|[0.976]|[0.976]|
+---------+-------+-------+

CPU times: user 60 ms, sys: 16.9 ms, total: 76.9 ms
Wall time: 12.8 s


In [17]:
%%time

classification_report = classification_report(test_data, prediction_data)
classification_report.show()

+-----+---------+-------+-------+
|Class|Precision| Recall|F-Score|
+-----+---------+-------+-------+
|[0.0]|  [0.978]|[0.992]|[0.985]|
|[1.0]|  [0.976]|[0.995]|[0.985]|
|[2.0]|  [0.981]|[0.971]|[0.976]|
|[3.0]|  [0.971]|[0.963]|[0.967]|
|[4.0]|  [0.984]|[0.977]| [0.98]|
|[5.0]|  [0.973]| [0.97]|[0.971]|
|[6.0]|  [0.978]|[0.989]|[0.983]|
|[7.0]|  [0.972]|[0.968]| [0.97]|
|[8.0]|  [0.979]|[0.965]|[0.972]|
|[9.0]|  [0.967]|[0.968]|[0.968]|
+-----+---------+-------+-------+

CPU times: user 85.6 ms, sys: 25.8 ms, total: 111 ms
Wall time: 15.6 s


---
# WORKING
---

In [74]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Calculation of statistics

def summary_statistics(metrics):
    # Overall statistics
    precision = metrics.precision()
    recall = metrics.recall()
    f1Score = metrics.fMeasure()
    print('Precision\tRecall\tF-Score')
    print('{}\t{}\t{}'.format(metrics.precision(), metrics.recall(), metrics.fMeasure()))
    

def label_statistics(metrics, labels):
    print('Class\tPrecision\tRecall\tF-Score')
    for label in sorted(labels):
        print('{}\t{}\t{}\t{}'.format(label, 
                                      round(metrics.precision(label), 3), 
                                      round(metrics.recall(label), 3), 
                                      round(metrics.fMeasure(label), 3)))


def statistics(test_data, prediction_data):
    # Compute raw scores on the test set
    prediction_and_labels = test_data.join(prediction_data, 'test_id').rdd \
                            .map(lambda x: (float(x.prediction[0]), float(x.test_label[0])))

    # Instantiate metrics object
    metrics = MulticlassMetrics(prediction_and_labels)

    # Overall statistics
    print("Summary Statistics\n")
    summary_statistics(metrics)

    # Statistics by class
    print("\nClass Summary Statistics\n")
    label_statistics(metrics, labels)
      

statistics(test_data, prediction_data)

Summary Statistics

Precision	Recall	F-Score
0.937	0.937	0.937

Class Summary Statistics



NameError: name 'labels' is not defined

In [53]:
%%time

# WORKS IN < 1 MIN BUT TOO MUCH NUMPY

import numpy as np
import operator
import collections
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def make_prediction(test_feature, train_features, train_lables, K=3):
    diff = test_feature - train_features.value
    diff_sq = udf(lambda x, y: math.pow(x, y), DoubleType())
#     similarity = diff_sq.sum(axis=1) ** 0.5
#     k_neighbors = train_lables[np.argpartition(similarity, K)[:K]]
#     predicted_label = collections.Counter(k_neighbors.ravel()).most_common()[0][0]
    return diff_sq

# # Calulate components
# lables = np.array(train_labels.value)
# features = np.array(train_features.value)

# Compute KNN predictions
predictions = test_data.rdd.map(
    lambda x: (x.test_id, make_prediction(x.test_feature, train_features, train_labels, K=5))).collect()

# Create dataframe of prediction resutlts
prediction_data = sc.parallelize(predictions).toDF(['test_id', 'prediction'])

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 125, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 376, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 555, in dumps
    return pickle.dumps(obj, protocol)
AttributeError: Can't pickle local object 'make_prediction.<locals>.<lambda>'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor115.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 376, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 555, in dumps
    return pickle.dumps(obj, protocol)
AttributeError: Can't pickle local object 'make_prediction.<locals>.<lambda>'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [59]:
df = sc.parallelize([float(x) for x in test_feature_instance]).toDF()

TypeError: Can not infer schema for type: <class 'float'>

In [None]:
from pyspark.sql.functions import sqrt

for x in features:
    dataTraining = dataTraining.withColumn(x, sqrt(x))

In [57]:
test_feature_instance = test_data.rdd.map(lambda x: x.test_feature).collect()[0]

In [44]:
test_feature_instance

DenseVector([-441.6308, -684.8732, 315.1821, 774.412, -494.188, -254.394, 814.6033, 510.4865, -26.6983, -126.1708, 329.7202, -42.3052, -181.7678, -59.8716, 140.8987, -1.9072, -153.8426, 23.7651, 137.3674, 52.6527, 94.5873, -133.79, 221.4396, -185.8931, 10.516, 112.5185, -74.4593, 1.4782, 19.262, -91.9423, 39.4218, -315.8124, 96.8876, 312.3212, -139.8656, -133.251, 10.4795, -29.5336, -54.8669, 38.6428, -319.6894, -15.2313, 122.6318, 1.6939, 9.661, 115.7194, -62.4649, -142.1218, -104.5901, -60.4343])

In [65]:
from pyspark.sql import Row
from pyspark.sql.functions import pow, col
from pyspark.sql.types import DoubleType


row = Row("col1")
df = sc.parallelize([row(1, 2), row(2, 3), row(3, 3)]).toDF()

In [66]:
type(DoubleType(test_feature_instance[0]))

TypeError: object() takes no parameters

In [None]:
%%time

# WORKS IN < 1 MIN BUT TOO MUCH NUMPY

import numpy as np
import operator
import collections

def make_prediction(test_feature, train_features, train_lables, K=3):
    similarity = ((test_feature - train_features.value) ** 2).sum(axis=1) ** 0.5
    k_neighbors = train_lables[np.argpartition(similarity, K)[:K]]
    predicted_label = collections.Counter(k_neighbors.ravel()).most_common()[0][0]
    return float(predicted_label)

# # Calulate components
# lables = np.array(train_labels.value)
# features = np.array(train_features.value)

# Compute KNN predictions
predictions = test_data.rdd.map(
    lambda x: (x.test_id, make_prediction(x.test_feature, train_features, train_labels, K=5))).collect()

# Create dataframe of prediction resutlts
prediction_data = sc.parallelize(predictions).toDF(['test_id', 'prediction'])

In [8]:
test_feature_instance = test_data.rdd.map(lambda x: x.test_features).collect()[0]

In [15]:
(test_feature_instance - test_feature_instance * 2)

DenseVector([441.6308, 684.8732, -315.1821, -774.412, 494.188, 254.394, -814.6033, -510.4865, 26.6983, 126.1708, -329.7202, 42.3052, 181.7678, 59.8716, -140.8987, 1.9072, 153.8426, -23.7651, -137.3674, -52.6527, -94.5873, 133.79, -221.4396, 185.8931, -10.516, -112.5185, 74.4593, -1.4782, -19.262, 91.9423, -39.4218, 315.8124, -96.8876, -312.3212, 139.8656, 133.251, -10.4795, 29.5336, 54.8669, -38.6428, 319.6894, 15.2313, -122.6318, -1.6939, -9.661, -115.7194, 62.4649, 142.1218, 104.5901, 60.4343])

In [96]:
# Calculate number of total examples
num_examples = len(train_labels.value)

In [97]:
import numpy as np

In [98]:
test_feature_instance = test_data.rdd.map(lambda x: x.test_features).collect()[0]

In [99]:
test_feature_instance

DenseVector([424.5277, -730.5222, 179.2756, 777.6196, 425.8708, -345.3109, 898.7382, 428.7594, -27.4878, 104.6313, 297.9047, 59.9869, -43.0493, -122.371, -231.3658, -14.3027, -146.4749, 89.414, -123.7011, 46.3869, -62.5745, 259.5283, -78.2594, -122.3114, -102.2687, 18.6292, -99.0931, 36.5025, 13.329, -221.9831, 93.7317, 284.3131, 95.1088, 204.8764, -14.7102, 152.582, 95.8835, -103.5235, 30.2718, -92.7304, -271.9503, 84.3579, 142.093, -47.1598, -185.6514, -27.2196, 101.4059, 49.6436, -26.9869, 97.4456])

In [134]:
train_feature_array = np.array(train_features.value)
test_feature_instance_array = np.array(test_feature_instance)
train_feature_array

array([[ 880.73143303,  329.95076555,  197.60632371, ...,   67.03356171,
          14.78866104,  -37.60150312],
       [1768.51722024,  312.13354179,  818.4599365 , ...,  167.92825986,
          95.56321573,   28.90079234],
       [ 704.94923633, -374.89664934,   33.61062603, ...,  -48.31471732,
        -177.90727891,   17.59871842],
       ...,
       [ 578.74539477, -142.80170756,  -35.4927068 , ...,  -17.55709776,
          89.08685093, -176.80453083],
       [ 887.40491646,   22.86843995,  735.97904942, ...,  -25.85748671,
          14.1455919 ,   19.637895  ],
       [ 583.36289193,   41.99530579,  778.13926939, ...,   84.50296752,
         173.89424851,   -7.80712508]])

In [185]:
def run():
    import operator
    import collections
    
#     def calc_distance(train_feature, test_feature):
#         return distance.euclidean(train_feature, test_feature)

#     def majority_vote(neighbors):
#         counts = {label: neighbors.count(label) for label in neighbors}
#         most_common_vote = max(counts.items(), key=operator.itemgetter(1))
#         if most_common_vote[1] > 1:
#             return float(most_common_vote[0])
#         else:
#             return float(closest_instance)

    def majority_vote(neighbors):
        return collections.Counter(neighbors).most_common()[0]     

    labels = np.array(train_labels.value).ravel()
    similarity = ((np.array(test_feature_instance) - np.array(train_feature_array)) ** 2).sum(axis=1) ** 0.5
    
    k_neighbors = labels[np.argpartition(similarity, 5)[:5]]
    predicted_label = majority_vote(k_neighbors)
    return predicted_label

# 2.61 s ± 78 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) [ALL TRAIN DATA]
# 2.44 s ± 28.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) [ALL TRAIN DATA - different majority vote]
# 1.6 s ± 14.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) [ALL TRAIN DATA - using euclidean distance / diff majority vote]
# 327 ms ± 19.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) [ALL TRAIN DATA - custom euclidean distance / diff majority vote]

In [186]:
%timeit run()

291 ms ± 12.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
from pyspark.mllib.linalg import Vectors
import numpy as np
import operator

K = 10

def cosine_distance(vec_one, vec_two):
    vec_one_sq = np.sqrt(vec_one.dot(vec_one))
    vec_two_sq = np.sqrt(vec_two.dot(vec_two))
    vec_one_two_dot = vec_one.dot(vec_two)
    similarity = vec_one_two_dot / (vec_one_sq * vec_two_sq)
    return float(similarity)

def majority_vote(sorted_dist):
    closest_instance = sorted_dist[0][1]
    neighbors = [neighbor[1] for neighbor in sorted_dist]
    counts = {label: neighbors.count(label) for label in neighbors}
    most_common_vote = max(counts.items(), key=operator.itemgetter(1))
    if most_common_vote[1] > 1:
        return float(most_common_vote[0])
    else:
        return float(closest_instance)
    
def compute_label(test_id, test_label, test_feature):
    results = []
    for i in range(num_examples):
        train_feature = train_features.value[i]
        train_label = train_labels.value[i]
        similarity = cosine_distance(train_feature, test_feature)
        results.append((similarity, train_label[0]))
    sorted_dist = sorted(results, reverse=True)[:K]
    prediction = majority_vote(sorted_dist)
    return (test_id, Vectors.dense(prediction))

# Compute KNN predictions
result = test_data.rdd.map(lambda x: compute_label(x.test_id, x.test_labels, x.test_features)).collect()