# <hr style="clear: both" />

# Running Spark in YARN-client mode

This notebook demonstrates how to set up a SparkContext that uses SURFsara's Hadoop cluster: [YARN resourcemanager](http://head05.hathi.surfsara.nl:8088/cluster) (note you will need to be authenticated via kerberos on your machine to visit the resourcemanager link) for executors.

First initialize kerberos via a Jupyter terminal. 
In the terminal execute: <BR>
<i>kinit -k -t data/robertop.keytab robertop@CUA.SURFSARA.NL</i><BR>
Print your credentials:


In [None]:
! klist

In [None]:
print sc

# <hr style="clear: both" />

# Now you can run your code

Pick a clustering algorithm (name of the file that provides a classify(x,y [,threshold]) function)

In [34]:
execfile('../spark-scripts/conventions.py')
execfile('../spark-scripts/eval.py')
execfile('../spark-scripts/implicitPlaylistAlgoFunctions.py')
execfile('../spark-scripts/implicitPlaylistAlgoMain.py')

CLUSTER_ALGO = 'jaccardBase'

execfile('../spark-scripts/' + CLUSTER_ALGO + '.py')


# Reading the conf file

In [35]:
import json
import copy

BASE_PATH = "/mnt/space/mattia"

conf = {}

conf['split'] = {}
conf['split']['reclistSize'] = 100
conf['split']['callParams'] = {}
conf['split']['excludeAlreadyListenedTest'] = True
conf['split']['name'] = 'SenzaRipetizioni_1'
conf['split']['split'] = conf['split']['name']
conf['split']['minEventsPerUser'] = 5
conf['split']['inputData'] = BASE_PATH + '/' + CLUSTER_ALGO + '.split/SenzaRipetizioni_1'
#conf['split']['inputData'] = 's3n://contentwise-research-poli/30musicdataset/newFormat/relations/sessions.idomaar'
conf['split']['bucketName'] = BASE_PATH
conf['split']['percUsTr'] = 0.05
conf['split']['ts'] = int(0.75 * (1421745857 - 1390209860) + 1390209860) - 10000
conf['split']['minEventPerSession'] = 5
conf['split']['onlineTrainingLength'] = 5
conf['split']['GTlength'] = 1
conf['split']['minEventPerSessionTraining'] = 10
conf['split']['minEventPerSessionTest'] = 11
conf['split']['mode'] = 'session'
conf['split']['forceSplitCreation'] = False
conf['split']["prop"] = {'reclistSize': conf['split']['reclistSize']}
conf['split']['type'] = None
conf['split']['out'] = BASE_PATH + '/' + CLUSTER_ALGO + '.split'
conf['split']['location'] = '30Mdataset/relations/sessions'

conf['evaluation'] = {}
conf['evaluation']['metric'] = {}
conf['evaluation']['metric']['type'] = 'recall'
conf['evaluation']['metric']['prop'] = {}
conf['evaluation']['metric']['prop']['N'] = [1,2,5,10,15,20,25,50,100]
conf['evaluation']['name'] = 'recall@N'

conf['general'] = {}
conf['general']['clientname'] = CLUSTER_ALGO + '.split'
conf['general']['bucketName'] = BASE_PATH
conf['general']['tracksPath'] = '30Mdataset/entities/tracks.idomaar.gz'

conf['algo'] = {}
conf['algo']['name'] = CLUSTER_ALGO
conf['algo']['props'] = {}
# ***** EXAMPLE OF CONFIGURATION *****#
conf['algo']['props']["sessionJaccardShrinkage"] = 5
conf['algo']['props']["clusterSimilarityThreshold"] = 0.1
conf['algo']['props']["expDecayFactor"] = 0.7
# ****** END EXAMPLE ****************#

# <hr style="clear: both" />
# Pick the list of songs ad create clusters


In [36]:
import json
import string

def my_replace_punct(x):
    ret = ""
    for i in x:
        if i == '+':
            ret += ' '
        else:
            ret += i
    return ret

tracksRDD = sc.textFile(BASE_PATH + '/30Mdataset/entities/tracks.idomaar.gz')
tracksRDD = tracksRDD.map(lambda x: x.split('\t')).map(lambda x: (x[1], json.loads(x[3])['name'].split('/') ) )
tracksRDD = tracksRDD.map(lambda x: (x[0], " ".join( (x[1][0], x[1][2]) ) ))
tracksRDD = tracksRDD.map(lambda x : (x[0], my_replace_punct(x[1])))
tracksRDD = tracksRDD.map(lambda x: (x[0], tokenize_song(x[1]), x[1]))


sampleRDD = tracksRDD.take(5000)
sampleRDD = sc.parallelize(sampleRDD)

tracksIdsRDD = tracksRDD.map(lambda x: (x[0], [x[0]]))

tracksIdsRDD.take(3)


[(u'0', [u'0']), (u'1', [u'1']), (u'2', [u'2'])]

# <hr style="clear: both" />

Reduce the quantity of data by building RDD {word -> songs}.
For each word keep only couples of songs that match.


In [37]:
#Build an RDD with ('word' -> (id, name))
wordsRDD = sampleRDD.flatMap(lambda x: [(i, (x[0], x[2])) for i in x[1]] )
wordsRDD.take(3)

#Group by 'word' and keep only the ones with more then 1 song
wordsRDD = wordsRDD.groupByKey().mapValues(list).filter(lambda x: len(x[1]) > 1)

#Compute a cartesian product for each list of songs with a common word
def filtered_cartesian(x):
    equal_couples = set()
    for i in range(len(x[1])):
        a = x[1][i]
        id_a = x[1][i][0]
        name_a = x[1][i][1]
        
        for j in range(i):
            b = x[1][j]
            id_b = x[1][j][0]
            name_b = x[1][j][1]
            if id_a != id_b:
                if classify(name_a, name_b):
                    equal_couples.add((a,b))
                    
    return (x[0], tuple(equal_couples))

coupleRDD = wordsRDD.map(filtered_cartesian).filter(lambda x: len(x[1]) > 1)
coupleRDD.take(3)

[(u'daydreamer',
  (((u'2852', u'10 Years 11.00 AM (Daydreamer)'),
    (u'2851', u'10 Years 11:00 AM (Daydreamer)')),
   ((u'2851', u'10 Years 11:00 AM (Daydreamer)'),
    (u'2850', u'10 Years 11-00 AM (Daydreamer)')),
   ((u'2852', u'10 Years 11.00 AM (Daydreamer)'),
    (u'2850', u'10 Years 11-00 AM (Daydreamer)')))),
 (u'and',
  (((u'3081', u'112 Peaches And Cream'),
    (u'2997', u'112 112 - Peaches and Cream')),
   ((u'2859', u'10 Years ... And All The Other Colors'),
    (u'2858', u'10 Years ...And All the Other Colors')))),
 (u'stones',
  (((u'4154', u'12 Stones Lie to Me (Acoustic)'),
    (u'4153', u'12 Stones Lie to me - acoustic')),
   ((u'4126', u'12 Stones Bulletproof_'),
    (u'4125', u'12 Stones Bulletproof'))))]

# <hr style="clear: both" />
Flip the dataset and map each song to the couples it belongs to. 
Group by key and for each song you have a cluster!


In [38]:
#Flatmap the list of couples
flattedCoupleRDD = coupleRDD.flatMap(lambda x: [i for i in x[1]])
#For each couple, for each song, yield song->couple
flattedCoupleRDD = flattedCoupleRDD.flatMap(lambda x: ((i[0], (x[0], x[1]) )for i in x) )


#Group by key (song). Each song has now one cluster
def merge_couples(x, y):
    return list(set(x) | set(y))

songClusterRDD = flattedCoupleRDD.reduceByKey(merge_couples).map(lambda x: (x[0], [i[0] for i in x[1]]))
songClusterRDD.take(30)

[(u'2859', [u'2859', u'2858']),
 (u'2648', [u'2639', u'2648']),
 (u'3603', [u'3602', u'3603']),
 (u'3610', [u'3611', u'3610']),
 (u'2858', [u'2859', u'2858']),
 (u'3602', [u'3602', u'3603']),
 (u'3611', [u'3611', u'3610']),
 (u'3612', [u'3613', u'3612']),
 (u'4217', [u'4217', u'4246']),
 (u'3613', [u'3613', u'3612']),
 (u'4216', [u'4245', u'4216']),
 (u'3614', [u'3615', u'3614']),
 (u'2639', [u'2639', u'2648']),
 (u'2961', [u'2959', u'2961']),
 (u'2638', [u'2638', u'2680']),
 (u'3615', [u'3615', u'3614']),
 (u'3775', [u'3775', u'3774']),
 (u'3774', [u'3775', u'3774']),
 (u'2851', [u'2851', u'2850', u'2852']),
 (u'2640', [u'2670', u'2640']),
 (u'4246', [u'4217', u'4246']),
 (u'2868', [u'2868', u'2869']),
 (u'2850', [u'2851', u'2850', u'2852']),
 (u'2869', [u'2868', u'2869']),
 (u'3623', [u'3623', u'3622']),
 (u'2448', [u'2448', u'2449']),
 (u'2852', [u'2851', u'2850', u'2852']),
 (u'4245', [u'4245', u'4216']),
 (u'3622', [u'3623', u'3622']),
 (u'2449', [u'2448', u'2449'])]

# <hr style="clear: both" />
Complete the clusters with all the other songs. 
Then we need to map cluster to songs to have new IDs.

In [39]:
#In this way we obtain a complete RDD with song -> group of songs
def reduce_to_biggest(x, y):
    bigger = x if len(x) > len(y) else y
    result = sorted(bigger)
    return result
           
unionRDD = songClusterRDD.union(tracksIdsRDD).reduceByKey(reduce_to_biggest)
unionRDD.take(100)


[(u'3187564', [u'3187564']),
 (u'839798', [u'839798']),
 (u'3265519', [u'3265519']),
 (u'4724052', [u'4724052']),
 (u'228052', [u'228052']),
 (u'186098', [u'186098']),
 (u'1767625', [u'1767625']),
 (u'34547', [u'34547']),
 (u'27390', [u'27390']),
 (u'2653961', [u'2653961']),
 (u'4972098', [u'4972098']),
 (u'931855', [u'931855']),
 (u'684237', [u'684237']),
 (u'2294826', [u'2294826']),
 (u'909900', [u'909900']),
 (u'1770605', [u'1770605']),
 (u'1971070', [u'1971070']),
 (u'2103417', [u'2103417']),
 (u'1837140', [u'1837140']),
 (u'1251968', [u'1251968']),
 (u'2413655', [u'2413655']),
 (u'3601843', [u'3601843']),
 (u'1946479', [u'1946479']),
 (u'1902200', [u'1902200']),
 (u'3032732', [u'3032732']),
 (u'3276442', [u'3276442']),
 (u'776530', [u'776530']),
 (u'3127205', [u'3127205']),
 (u'1179403', [u'1179403']),
 (u'1751011', [u'1751011']),
 (u'3426069', [u'3426069']),
 (u'4558204', [u'4558204']),
 (u'2901109', [u'2901109']),
 (u'2462396', [u'2462396']),
 (u'4470544', [u'4470544']),
 (u'248

# <hr style="clear: both" />
We have song -> cluster. We map inversly (cluster -> song) and group by key (cluster).
Finally we zip with index and obtain new IDs.

In [None]:
#Flip the mapping as cluster->song
clusterSongsRDD = unionRDD.map(lambda x: (' '.join(x[1]), x[0])).groupByKey().mapValues(list)
clusterSongsRDD = clusterSongsRDD.zipWithIndex().map(lambda x: (x[1], x[0][1]))
clusterSongsRDD.take(30)

In [None]:
clusterSongsRDD.count()

# <hr style="clear: both" />
Save the RDD (newID, songs) in "/mnt/space/mattia/clusters/[ALGORITHM]"

In [None]:
#Save clustering
clusterSongsRDD.saveAsTextFile(BASE_PATH + '/clusters/' + CLUSTER_ALGO)

Create RDD with song -> clusterID

In [33]:
clusterSongsRDD = sc.textFile(BASE_PATH + '/clusters/' + CLUSTER_ALGO)
songToClusterRDD = clusterSongsRDD.flatMap(lambda x: [(i, x[0]) for i in x[1]] )

clusterSongsRDD.lookup(2)

[]


Split the data

In [None]:
execfile('../spark-scripts/splitCluster2.py')
splitter(conf)

It's time to substitute songs with their cluster

In [255]:
import json
execfile('../spark-scripts/utilsCluster2.py')
train, test = loadDataset(conf)
train.cache()
test.cache()

def flat_map_tracks_ids(x):
    objects = x['linkedinfo']['objects']
    result = []
    for i in range(len(objects)):
        result.append( (str(objects[i]['id']), (i, x)) )
    return result

trainFlat = train.map(lambda x: json.loads(x)).flatMap(flat_map_tracks_ids)
testFlat = test.map(lambda x: json.loads(x)).flatMap(flat_map_tracks_ids)

In [258]:
trainJoin = trainFlat.join(songToClusterRDD)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 395 in stage 1319.0 failed 1 times, most recent failure: Lost task 395.0 in stage 1319.0 (TID 29187, localhost): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:343)
	at org.xerial.snappy.SnappyOutputStream.compressInput(SnappyOutputStream.java:357)
	at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:280)
	at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115)
	at org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1915)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185)
	at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor100.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:343)
	at org.xerial.snappy.SnappyOutputStream.compressInput(SnappyOutputStream.java:357)
	at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:280)
	at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115)
	at org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1915)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185)
	at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
trainJoin.take(1)

In [None]:
from os import path
basePath = path.join(conf['general']['bucketName'], conf['general']['clientname'])
splitPath = path.join(basePath, conf['split']['name'])

clusterSimList = [0.1]
sessionJaccardShrinkageList = [5]
expDecayList = [0.7]

for exclude in [True]:
    conf['split']['excludeAlreadyListenedTest'] = str(exclude)
    #conf['split']['name'] = 'giroCompletoTestMultipleConfs_exclude%s' % exclude
    #splitter(conf)
    #train, test = loadDataset(conf)
    #train.cache()
    #test.cache()
    
    for sessionJaccardShrinkage in sessionJaccardShrinkageList:
        conf['algo']['props']["sessionJaccardShrinkage"] = sessionJaccardShrinkage
        
        for clusterSim in clusterSimList:
            conf['algo']['props']["clusterSimilarityThreshold"] = clusterSim
            
            playlists = extractImplicitPlaylists(train, conf).cache()
            
            for expDecay in expDecayList:
                conf['algo']['props']["expDecayFactor"] = expDecay
                conf['algo']['name'] = CLUSTER_ALGO + '_ImplicitPlaylist_shk_%d_clustSim_%.3f_decay_%.3f' % \
                    (sessionJaccardShrinkage, clusterSim, expDecay )

                recJsonRDD = executeImplicitPlaylistAlgo(playlists, test, conf)
                try:
                    saveRecommendations(conf, recJsonRDD, overwrite=True)
                    try:
                        computeMetrics(conf)
                    except:
                        print 'Error in computing metrics'
                except:
                    print 'Error in saving recommndations'
                    try:
                        computeMetrics(conf)
                    except:
                        print 'Error in computing metrics'