In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import numpy as np
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.ml import Pipeline
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from time import time


In [2]:
# read training files and test files, initialize spark and some parameters
spark = SparkSession \
    .builder \
    .appName('Cloud Computing Assignment2') \
    .getOrCreate()

train_datafile = 'knn1.0/Train-label-28x28.csv'
test_datafile = 'knn1.0/Test-label-28x28.csv'
# train_datafile = 'hdfs://soit-hdp-pro-1.ucc.usyd.edu.au/share/MNIST/Train-label-28x28.csv'
# test_datafile = 'hdfs://soit-hdp-pro-1.ucc.usyd.edu.au/share/MNIST/Test-label-28x28.csv'
num_train_samples = 60000
num_test_samples = 10000

train_data = spark.read.csv(train_datafile,header=False,inferSchema="true")
test_data = spark.read.csv(test_datafile,header=False,inferSchema="true")

In [3]:
def read_file(filename):
    data = spark.read.csv(filename,header=False,inferSchema="true")
    assembler = VectorAssembler(inputCols=data.columns[1:],outputCol="features")
    return assembler.transform(data).select("_c0","features")
train_vectors = read_file(train_datafile)
train_vectors
# assembler2 = VectorAssembler(inputCols=test_data.columns[1:],outputCol="features")
# test_vectors = assembler2.transform(test_data).select(test_data["_c0"].alias("test_label"),"features")

DataFrame[_c0: int, features: vector]

In [29]:
matrix = np.array([0]*100)
partitioned = train_vectors.repartition("_c0")
def f(x):
    global matrix
    for i in x:
        matrix[int(i[0])] += 1
    return x
# l = partitioned.foreachPartition(f).collect()
partitioned = partitioned.rdd.mapPartitions(f)
print(matrix)


[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [4]:
pca = PCA(k = 97, inputCol="features", outputCol="pca")
trainmodel = pca.fit(train_vectors)
train_vectors = trainmodel.transform(train_vectors)
train_vectors = train_vectors.select(train_vectors["train_label"],
                                     train_vectors["pca"].alias("train_features"))
test_vectors = trainmodel.transform(test_vectors)
test_vectors = test_vectors.select(test_vectors["test_label"],
                                     test_vectors["pca"].alias("test_features"))
# test_vectors = trainmodel.transform(train_vectors).select('train_pca')

AnalysisException: 'Cannot resolve column name "train_label" among (_c0, features, pca);'

In [5]:
train_info = train_vectors.collect()
train_info = [y for x in train_info for y in x]
l = spark.sparkContext.broadcast(np.array(train_info[0::2]))
f = spark.sparkContext.broadcast(np.array(train_info[1::2]))
matrix = [[0]*10]*10
cm = spark.sparkContext.broadcast(matrix)

In [10]:
# start_time = time()
from pyspark.sql.types import IntegerType
num_k = 3
knn_udf = udf(lambda x:int(np.bincount(np.take(l.value, np.argpartition(np.sum((f.value-np.array(x))**2,axis=1),num_k)[:num_k])).argmax()),IntegerType())
knnprediction = test_vectors.withColumn("predict_label",knn_udf("test_features")).select("test_label","predict_label")
def fun(data):
    matrix[int(data[0])][int(data[1])]+=1
knnprediction.foreach(fun)
knnprediction.show(5)
# matrix
# end_time = time()
# print("--- The model Fit used: %2f seconds" %
#           (end_time - start_time))

+----------+-------------+
|test_label|predict_label|
+----------+-------------+
|         7|            7|
|         2|            2|
|         1|            1|
|         0|            0|
|         4|            4|
+----------+-------------+
only showing top 5 rows



In [12]:
def knnprediction(d):
    test_f = np.array(d[1])
    dists = np.sum((train_features-test_f)**2,axis=1)
    combined = np.dstack((train_labels, dists))[0]
    sorteddists = sorted(combined, key = lambda s:float(s[1]))[:3][0].astype(int)
#     nearest_label = np.array(sorteddists).T[0].astype(int)
    p = np.bincount(sorteddists).argmax()
    return (d[0],p)

test_rdd = test_vectors.rdd.map(knnprediction)
test_rdd.take(100)

[(7, 7),
 (2, 2),
 (1, 1),
 (0, 0),
 (4, 9),
 (1, 1),
 (4, 9),
 (9, 9),
 (5, 4),
 (9, 9),
 (0, 0),
 (6, 6),
 (9, 9),
 (0, 0),
 (1, 1),
 (5, 5),
 (9, 9),
 (7, 7),
 (3, 3),
 (4, 4),
 (9, 9),
 (6, 6),
 (6, 6),
 (5, 5),
 (4, 9),
 (0, 0),
 (7, 7),
 (4, 9),
 (0, 0),
 (1, 1),
 (3, 3),
 (1, 1),
 (3, 3),
 (4, 6),
 (7, 7),
 (2, 2),
 (7, 7),
 (1, 1),
 (2, 2),
 (1, 1),
 (1, 1),
 (7, 7),
 (4, 4),
 (2, 1),
 (3, 5),
 (5, 5),
 (1, 1),
 (2, 8),
 (4, 4),
 (4, 4),
 (6, 6),
 (3, 3),
 (5, 5),
 (5, 5),
 (6, 0),
 (0, 0),
 (4, 4),
 (1, 1),
 (9, 9),
 (5, 1),
 (7, 7),
 (8, 2),
 (9, 4),
 (3, 9),
 (7, 7),
 (4, 9),
 (6, 0),
 (4, 4),
 (3, 3),
 (0, 0),
 (7, 7),
 (0, 0),
 (2, 2),
 (9, 8),
 (1, 1),
 (7, 7),
 (3, 3),
 (2, 3),
 (9, 9),
 (7, 7),
 (7, 9),
 (6, 6),
 (2, 2),
 (7, 9),
 (8, 8),
 (4, 4),
 (7, 7),
 (3, 3),
 (6, 6),
 (1, 1),
 (3, 3),
 (6, 6),
 (9, 9),
 (3, 3),
 (1, 1),
 (4, 4),
 (1, 1),
 (7, 7),
 (6, 6),
 (9, 9)]

In [32]:
def knnprediction(d):
    train_l = np.array(train_labels)
    test_f = np.array(d[1])
    dists = np.sum((train_features-test_f)**2,axis=1)
    return (len(train_l),dists)
knnpredictions = test_rdd.map(knnprediction)
knnpredictions.take(2)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 61.0 failed 1 times, most recent failure: Lost task 0.0 in stage 61.0 (TID 37, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-32-a5ec33a43329>", line 3, in knnprediction
NameError: name 'test_f' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-32-a5ec33a43329>", line 3, in knnprediction
NameError: name 'test_f' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [6]:
dataset = train_vectors.crossJoin(test_vectors)
dataset.foreach

[((1, 7), (2280.021767722834, 5)),
 ((2, 2), (2691.11738741113, 5)),
 ((3, 1), (2247.5735321538946, 5)),
 ((4, 0), (2444.9410658716374, 5)),
 ((5, 4), (2575.284898569139, 5)),
 ((6, 1), (2309.1923293118543, 5)),
 ((7, 4), (2513.346755243276, 5)),
 ((8, 9), (2385.2804332099568, 5)),
 ((9, 5), (2366.4508673260743, 5)),
 ((10, 9), (2563.51698881841, 5))]

In [7]:
def sortdistance(d):
    key, value = d
    sorted(value, key = lambda s:s[0])
    return d
def mapdfmethod(d):
    return(d[4],(d[1],d[3],d[0],d[2]))
def edistance(d):
    key, value = d
    edistance = np.sum((value[1]-value[0])*(value[1]-value[0])) ** 0.5
    return ((key, value[3]),(edistance, value[2]))

datapairs = dataset.rdd.map(mapdfmethod)
distances = datapairs.map(edistance)
distances.take(10)

+-----------+--------------------+----------+--------------------+----------+
|train_label|      train_features|test_label|       test_features|test_index|
+-----------+--------------------+----------+--------------------+----------+
|          5|[850.672141291628...|         7|[361.419311328079...|         1|
|          5|[850.672141291628...|         2|[873.283433767456...|         2|
|          5|[850.672141291628...|         1|[-197.20339527162...|         3|
|          5|[850.672141291628...|         0|[1906.37377339178...|         4|
|          5|[850.672141291628...|         4|[834.047132730588...|         5|
|          5|[850.672141291628...|         1|[-295.49175600234...|         6|
|          5|[850.672141291628...|         4|[392.844261113666...|         7|
|          5|[850.672141291628...|         9|[323.845341263129...|         8|
|          5|[850.672141291628...|         5|[968.396980231302...|         9|
|          5|[850.672141291628...|         9|[477.756435129662..