In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import pandas as pd

In [2]:
spark = SparkSession.builder.appName("Clustering Analysis").getOrCreate()

In [3]:
# Read in files with pandas
user_artist_data = pd.read_csv('data/user_artist_data_small.txt', names=['user_id', 'artist_id', 'count'], sep=' ')
artist_data = pd.read_csv('data/artist_data_small.txt', names=['artist_id', 'artist_name'], sep='\t')
artist_alias = pd.read_csv('data/artist_alias_small.txt', names=['alias_id', 'artist_id'], sep='\t')

In [4]:
# Create RDD
uad = spark.createDataFrame(user_artist_data)

In [5]:
# Look at uad_df
print(uad.select('artist_id').rdd.min())
print(uad.select('artist_id').rdd.max())

Row(artist_id=1)
Row(artist_id=10788218)


In [6]:
# Create a dictionary of aliases
alias_dict = artist_alias.set_index('alias_id')['artist_id'].to_dict()

In [7]:
def unalias(x):
    if x['artist_id'] in alias_dict:
        x['artist_id'] = alias_dict[x['artist_id']]
    return x

uad = uad.rdd.map(unalias)
uad.take(2)

[Row(user_id=1059637, artist_id=1000010, count=238),
 Row(user_id=1059637, artist_id=1000049, count=1)]

In [20]:
def make_rating(x):
    return Rating(int(x['user_id']), int(x['artist_id']), int(x['count']))

ratings = uad.map(make_rating)

In [21]:
print(type(ratings))
print(type(ratings.take(1)[0]))
ratings = ratings.map(lambda row: (row[0], row[1], row[2]))
ratings.take(2)
print(type(ratings.take(1)[0]))

<class 'pyspark.rdd.PipelinedRDD'>
<class 'pyspark.mllib.recommendation.Rating'>
<class 'tuple'>


In [11]:
ratings_df = ratings.toDF()
ratings_df

DataFrame[_1: bigint, _2: bigint, _3: double]

In [None]:
training, test = rating_df = 

In [13]:
type(ratings)

pyspark.rdd.PipelinedRDD

In [22]:
# Build the recommendation model using Alternating Least Squares
# ALS train arguments: ratings, rank, iterations=5, lambda_=0.01, blocks=-1
rank = 10
iterations = 5
lambda_ = 0.01
blocks = 1.0
model = ALS.trainImplicit(ratings, rank, iterations, alpha=0.01)

Py4JJavaError: An error occurred while calling o453.trainImplicitALSModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 35.0 failed 1 times, most recent failure: Lost task 1.0 in stage 35.0 (TID 44, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-7-16f4e708da9f>", line 3, in unalias
TypeError: 'Row' object does not support item assignment

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:857)
	at org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:255)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainImplicitALSModel(PythonMLLibAPI.scala:519)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-7-16f4e708da9f>", line 3, in unalias
TypeError: 'Row' object does not support item assignment

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [10]:
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

NameError: name 'model' is not defined

In [10]:
# Map other artist names to their correct IDs
def unalias(x):
    if x in alias_dict:
        return alias_dict[x]
    else:
        return x

mapped = uad.select('artist_id').rdd.map(unalias)
print(mapped)
print(mapped.toDF()['artist_id'])
uad_dropped = uad.drop('artist_id')
print(uad_dropped.show())
uad_new = uad_dropped.withColumn('new_artist_id', mapped.toDF()['artist_id'])
mapped.take(4)

PythonRDD[39] at RDD at PythonRDD.scala:48
Column<b'artist_id'>
+-------+-----+
|user_id|count|
+-------+-----+
|1059637|  238|
|1059637|    1|
|1059637|    1|
|1059637|   11|
|1059637|    1|
|1059637|  423|
|1059637|    5|
|1059637|    2|
|1059637|    2|
|1059637|19129|
|1059637|    4|
|1059637|  188|
|1059637|  180|
|1059637|    2|
|1059637|    1|
|1059637|   21|
|1059637|    1|
|1059637|   20|
|1059637|   12|
|1059637|   10|
+-------+-----+
only showing top 20 rows

None


AnalysisException: 'resolved attribute(s) artist_id#42L missing from user_id#23L,count#25L in operator !Project [user_id#23L, count#25L, artist_id#42L AS new_artist_id#45L];;\n!Project [user_id#23L, count#25L, artist_id#42L AS new_artist_id#45L]\n+- Project [user_id#23L, count#25L]\n   +- LogicalRDD [user_id#23L, artist_id#24L, count#25L]\n'

In [96]:
# Explore alias df
print(len(artist_alias['artist_id'].unique()))
print(len(artist_alias['alias_id'].unique()))

486
587


In [74]:
# There are more alias_id records than there are artist_id records
# So lets take the user_artist_data, and change every artist id to it's "un-aliased" id
def unalias(x):
    try:
        unaliased_id = artist_alias[artist_alias['alias_id'] == x]['artist_id'][0]
        return unaliased_id
    except:
        return x

user_artist_data['correct_artist_id'] = user_artist_data['artist_id'].apply(unalias)

In [90]:
# Another way that might be faster
user_artist_data['old_artist_id'] = user_artist_data['artist_id']
unaliased_uad = user_artist_data.set_index('artist_id').join(artist_alias.set_index('alias_id'))
unaliased_uad['artist_id'] = unaliased_uad['artist_id'].fillna(unaliased_uad['old_artist_id'])
unaliased_uad = unaliased_uad.drop('old_artist_id', axis=1)
unaliased_uad

Unnamed: 0,user_id,count,artist_id
1,1046559,147,1.0
1,1052461,3481,1.0
1,1031009,4,1.0
1,2062243,22,1.0
1,1058890,3,1.0
1,2288164,4006,1.0
1,1063644,331,1.0
1,1048402,53,1.0
1,1017610,376,1.0
1,2023686,49,1.0


In [81]:
user_artist_data.head()

Unnamed: 0,user_id,artist_id,count,correct_artist_id
0,1059637,1000010,238,1000010
1,1059637,1000049,1,1000049
2,1059637,1000056,1,1000056
3,1059637,1000062,11,1000062
4,1059637,1000094,1,1000094


In [42]:
user_artist_data = spark.sparkContext.textFile('data/user_artist_data_small.txt').map(lambda x: x.split())
artist_data = spark.sparkContext.textFile('data/artist_data_small.txt').map(seperate_artist_data)

PythonRDD[50] at RDD at PythonRDD.scala:48

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 22, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-38-4ef1c3b2bad4>", line 3, in seperate_artist_data
ValueError: not enough values to unpack (expected 2, got 1)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-38-4ef1c3b2bad4>", line 3, in seperate_artist_data
ValueError: not enough values to unpack (expected 2, got 1)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
