# PySpark Demo

In [2]:
import pyspark

sc.stop()
sc = pyspark.SparkContext('local[*]')

rdd = sc.parallelize([1,2,3,4])
rdd.collect()

[1, 2, 3, 4]

# Iterative Programming

In [5]:
# loading data
a = [1,2,3,4,5,6,7,8]

# get even number
b = []
for ele in a:
    if ele % 2 == 0: 
        b.append(ele)
        
# get sum of even number
c = sum(b)
c

20

# Functional Programming

In [6]:
import numpy as np

a = np.array([1,2,3,4,5,6,7,8])

def f(ele):
    return ele % 2 == 0

sum(a[f(a)])

20

# PySpark 語法

In [7]:
rdd = sc.parallelize([1,2,3,4,5], 4)
rdd

ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:175

In [8]:
rdd.collect()

[1, 2, 3, 4, 5]

# Get Data From File

In [4]:
lines = sc.textFile('file:/tmp/trump.txt')
lines.take(3)

[u'Chief Justice Roberts, President Carter, President Clinton, President Bush, fellow Americans and people of the world \u2013 thank you.',
 u'We the citizens of America have now joined a great national effort to rebuild our county and restore its promise for all our people.',
 u'']

# Python Lambda

In [6]:
def addNum(a, b):
    return a + b

addNum(2,3)

addNum2 = lambda a, b : a + b
addNum2(3,4)

7

In [7]:
exp = lambda e: e**2
exp(4)

16

# PySpark Transformation

In [8]:
rdd = sc.parallelize([1,2,3,4])
a = rdd.map(lambda x: x * 2)
a

PythonRDD[7] at RDD at PythonRDD.scala:48

In [9]:
a.collect()

[2, 4, 6, 8]

In [10]:
rdd = sc.parallelize([1,2,3,4])
a = rdd.map(lambda x: x % 2 == 0)
a.collect()

[False, True, False, True]

In [11]:
import numpy as np
a = np.array([1,2,3,4])
a[a % 2 == 0]

array([2, 4])

In [12]:
rdd = sc.parallelize([1,4,2,2,3])
a = rdd.distinct()
a.collect()

[1, 2, 3, 4]

In [13]:
rdd = sc.parallelize([1,2,3])
a = rdd.map(lambda x: [x,x+5])
a.collect()

[[1, 6], [2, 7], [3, 8]]

In [16]:
rdd = sc.parallelize([1,2,3])
a = rdd.flatMap(lambda x:[x,x+5])
a.collect()

[1, 6, 2, 7, 3, 8]

# Spark Action

In [18]:
rdd = sc.parallelize([1,2,3])
rdd.reduce(lambda a,b: a*b)

# 1 2 3
#  2
#    6

6

In [19]:
rdd.take(2)

[1, 2]

In [20]:
rdd.collect()

[1, 2, 3]

In [21]:
rdd = sc.parallelize([5,3,1,2])
rdd.takeOrdered(3, lambda s: -1*s)

[5, 3, 2]

# Key-Value Pair

In [22]:
rdd = sc.parallelize([(1,2), (3,4), (3,6)])
a = rdd.reduceByKey(lambda a,b: a+b)
a.collect()

#RDD: [(1,2), (3,4), (3,6)] → [(1,2), (3,10)]

[(1, 2), (3, 10)]

In [23]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
a = rdd2.sortByKey()
a.collect()

#RDD: [(1,'a'), (2,'c'), (1,'b')] → [(1,'a'), (1,'b'), (2,'c')]

[(1, 'a'), (1, 'b'), (2, 'c')]

In [24]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
a = rdd2.groupByKey()
a.collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x7f72253b9390>),
 (2, <pyspark.resultiterable.ResultIterable at 0x7f72253e9e50>)]

# Broadcast

In [26]:
broadcastVar = sc.broadcast([1,2,3])

In [27]:
broadcastVar.value

[1, 2, 3]

In [28]:
accum = sc.accumulator(0)
rdd = sc.parallelize([1,2,3,4])

def f(x):
    global accum
    accum += x
    
rdd.foreach(f)
accum.value

10

# 電影分析

In [30]:
lines = sc.textFile('file:/tmp/u.data')
lines.take(5)

[u'196\t242\t3\t881250949',
 u'186\t302\t3\t891717742',
 u'22\t377\t1\t878887116',
 u'244\t51\t2\t880606923',
 u'166\t346\t1\t886397596']

In [32]:
movies= lines.map(lambda x : (int(x.split()[1]) , 1) ) 
movies.take(3)

[(242, 1), (302, 1), (377, 1)]

In [37]:
movieCounts = movies.reduceByKey(lambda x,y: x+y)

In [38]:
movieCounts.take(3)

[(2, 131), (4, 209), (6, 26)]

In [39]:
res = movieCounts.sortBy(lambda a: -a[1])

In [40]:
res.take(5)

[(50, 583), (258, 509), (100, 508), (181, 507), (294, 485)]

In [55]:
def loadMovieNames():
    movieNames = {}
    with open('u.item', 'r', encoding='utf-8') as f:
        for line in f.readlines():
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
            #print(fields[0], fields[1])
            #break
    return movieNames

In [56]:
nameDict = sc.broadcast(loadMovieNames())

UnicodeEncodeError: 'ascii' codec can't encode character u'\ufeff' in position 0: ordinal not in range(128)

In [57]:
res = movieCounts.sortBy(lambda a: -a[1])
res2 = res.map(lambda e: (nameDict.value.get(e[0]), e[1]))
res2.take(10)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 44.0 failed 1 times, most recent failure: Lost task 0.0 in stage 44.0 (TID 184, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-57-ad15de2f1701>", line 2, in <lambda>
NameError: global name 'nameDict' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-57-ad15de2f1701>", line 2, in <lambda>
NameError: global name 'nameDict' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# Spark SQL and DataFrame

In [58]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [59]:
data_file = 'file:/tmp/ratings.txt'
raw_data = sc.textFile(data_file, 4)

In [60]:
raw_data.take(3)

[u'userid::itemid::rating', u'0::0::4', u'0::1::5']

In [61]:
header = raw_data.first()
header

u'userid::itemid::rating'

In [62]:
skip_data = raw_data.filter(lambda line: line != header)

In [63]:
skip_data.take(3)

[u'0::0::4', u'0::1::5', u'0::7495::3']

In [67]:
csv_data = skip_data.map(lambda l: l.split("::"))
csv_data.take(3)

[[u'0', u'0', u'4'], [u'0', u'1', u'5'], [u'0', u'7495', u'3']]

In [68]:
from pyspark.sql import Row

In [69]:
row_data = csv_data.map(
        lambda p:
            Row(
                userid = p[0],
                itemid = p[1],
                rating = int(p[2])
            )
)

row_data.take(3)

[Row(itemid=u'0', rating=4, userid=u'0'),
 Row(itemid=u'1', rating=5, userid=u'0'),
 Row(itemid=u'7495', rating=3, userid=u'0')]

In [70]:
a = [{'userid':0, 'itemid':0, 'rating':4},
 {'userid':0, 'itemid':1, 'rating':5}
]

#panda df
import pandas
pandas_df = pandas.DataFrame(a)
pandas_df

Unnamed: 0,itemid,rating,userid
0,0,4,0
1,1,5,0


In [71]:
# Spark DataFrame
df = sqlContext.createDataFrame(row_data)

In [72]:
df.take(3)

[Row(itemid=u'0', rating=4, userid=u'0'),
 Row(itemid=u'1', rating=5, userid=u'0'),
 Row(itemid=u'7495', rating=3, userid=u'0')]

In [73]:
df.show()

+------+------+------+
|itemid|rating|userid|
+------+------+------+
|     0|     4|     0|
|     1|     5|     0|
|  7495|     3|     0|
|  7496|     5|     0|
|  7497|     5|     0|
|  7498|     4|     0|
| 23688|     3|     0|
| 23689|     2|     0|
| 23690|     3|     0|
| 23691|     5|     0|
| 23692|     3|     0|
| 23693|     3|     0|
| 23694|     4|     0|
| 23695|     4|     0|
| 23696|     5|     0|
| 46295|     5|     0|
| 46296|     4|     0|
| 46297|     4|     0|
| 46298|     3|     0|
| 46299|     5|     0|
+------+------+------+
only showing top 20 rows



In [74]:
df.select("userid", "rating").groupBy("userid").avg().show()

+------+------------------+
|userid|       avg(rating)|
+------+------------------+
|   296|3.7885714285714287|
|   467| 3.257575757575758|
|   675| 3.193661971830986|
|   691|3.6065573770491803|
|   829|3.9507042253521125|
|  1090| 3.398876404494382|
|  1159|               4.4|
|  1436| 4.101167315175097|
|  1512| 3.466666666666667|
|  1572|             3.895|
|  2069|3.8285714285714287|
|  2088|3.7857142857142856|
|  2136|               3.6|
|  2162| 2.857142857142857|
|  2294|               3.0|
|  2904| 4.222222222222222|
|  3210| 4.229166666666667|
|  3414| 4.416666666666667|
|  3606|               4.0|
|  3959| 4.538461538461538|
+------+------------------+
only showing top 20 rows



In [75]:
df.registerTempTable("ratings")

In [76]:
df.printSchema()

root
 |-- itemid: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- userid: string (nullable = true)



In [77]:
ratings_data = sqlContext.sql("""
    SELECT userid,AVG(rating) FROM ratings GROUP BY userid
""") 
ratings_data.show()

+------+------------------+
|userid|       avg(rating)|
+------+------------------+
|   296|3.7885714285714287|
|   467| 3.257575757575758|
|   675| 3.193661971830986|
|   691|3.6065573770491803|
|   829|3.9507042253521125|
|  1090| 3.398876404494382|
|  1159|               4.4|
|  1436| 4.101167315175097|
|  1512| 3.466666666666667|
|  1572|             3.895|
|  2069|3.8285714285714287|
|  2088|3.7857142857142856|
|  2136|               3.6|
|  2162| 2.857142857142857|
|  2294|               3.0|
|  2904| 4.222222222222222|
|  3210| 4.229166666666667|
|  3414| 4.416666666666667|
|  3606|               4.0|
|  3959| 4.538461538461538|
+------+------------------+
only showing top 20 rows



In [79]:
ratings = sc.textFile('file:/tmp/u.data', 4) 
ratings_data = ratings.map(lambda l:l.split()) 
ratings_row_data = ratings_data.map(lambda p:
    Row( userid=p[0], movieid=p[1], rating=int(p[2]) ) )

ratings_row_data.take(4)

[Row(movieid=u'242', rating=3, userid=u'196'),
 Row(movieid=u'302', rating=3, userid=u'186'),
 Row(movieid=u'377', rating=1, userid=u'22'),
 Row(movieid=u'51', rating=2, userid=u'244')]

In [80]:
df = sqlContext.createDataFrame(ratings_row_data)
df.registerTempTable("ratings")

In [81]:
ratings_data = sqlContext.sql("""
    SELECT movieid, count(1) FROM ratings GROUP BY movieid ORDER BY COUNT(1) DESC LIMIT 10
""") 
ratings_data.show()

+-------+--------+
|movieid|count(1)|
+-------+--------+
|     50|     583|
|    258|     509|
|    100|     508|
|    181|     507|
|    294|     485|
|    286|     481|
|    288|     478|
|      1|     452|
|    300|     431|
|    121|     429|
+-------+--------+



In [82]:
movies = sc.textFile('file:/tmp/u.item', 4)
movies_data = movies.map(lambda l:l.split('|')) 
#movies_data.take(3)
movies_row_data = movies_data.map(lambda p:
    Row(movieid=p[0], moviename=p[1] ) )
movies_row_data.take(4)

[Row(movieid=u'1', moviename=u'Toy Story (1995)'),
 Row(movieid=u'2', moviename=u'GoldenEye (1995)'),
 Row(movieid=u'3', moviename=u'Four Rooms (1995)'),
 Row(movieid=u'4', moviename=u'Get Shorty (1995)')]

In [83]:
movies_df = sqlContext.createDataFrame(movies_row_data)
movies_df.registerTempTable("movies")

In [85]:
best_movies = sqlContext.sql("""
SELECT moviename,avg(rating) as avg_rating, count(1) as cnt 
FROM movies INNER JOIN ratings ON ratings.movieid = movies.movieid 
GROUP BY moviename 
ORDER by AVG(rating) DESC LIMIT 10
""") 
best_movies.show(5)

+--------------------+----------+---+
|           moviename|avg_rating|cnt|
+--------------------+----------+---+
|Someone Else's Am...|       5.0|  1|
|Saint of Fort Was...|       5.0|  2|
|Aiqing wansui (1994)|       5.0|  1|
|Marlene Dietrich:...|       5.0|  1|
|     Star Kid (1997)|       5.0|  3|
+--------------------+----------+---+
only showing top 5 rows



In [86]:
best_movies = sqlContext.sql("""
SELECT moviename,avg(rating) as avg_rating, count(1) as cnt 
FROM movies INNER JOIN ratings ON ratings.movieid = movies.movieid 
GROUP BY moviename 
ORDER by cnt DESC LIMIT 10
""") 
best_movies.show(5)

+--------------------+------------------+---+
|           moviename|        avg_rating|cnt|
+--------------------+------------------+---+
|    Star Wars (1977)|4.3584905660377355|583|
|      Contact (1997)|3.8035363457760316|509|
|        Fargo (1996)| 4.155511811023622|508|
|Return of the Jed...| 4.007889546351085|507|
|    Liar Liar (1997)| 3.156701030927835|485|
+--------------------+------------------+---+
only showing top 5 rows



# ALS

In [87]:
rawData = sc.textFile("/tmp/u.data") 
rawData.first() #讀取第一列資料
#讀取user id, movie id, ratings
rawRatings = rawData.map(lambda e: e.split()) 
rawRatings.take(3)

[[u'196', u'242', u'3', u'881250949'],
 [u'186', u'302', u'3', u'891717742'],
 [u'22', u'377', u'1', u'878887116']]

In [88]:
from pyspark.sql import Row
#將資料轉進ratingsRDD 物件 
ratingsRDD = rawRatings.map(\
    lambda p: Row(userId=int(p[0]), \
                  movieId=int(p[1]), \
                  rating=float(p[2]), \
                  timestamp=int(p[3])))

# 建立 DataFrame
ratings = sqlContext.createDataFrame(ratingsRDD)

# 將資料區分為訓練與測試資料集
(training, test) = ratings.randomSplit([0.8, 0.2])

In [89]:
from pyspark.ml.recommendation import ALS
als = ALS(rank=50, maxIter=10, regParam=0.01, \
          userCol="userId", itemCol="movieId", \
          ratingCol="rating")
model = als.fit(training)

In [90]:
model.userFactors.count()

943

In [91]:
model.itemFactors.count()

1655

In [95]:
userRecs = model.recommendForAllUsers(10)

In [96]:
userRecs.take(5)

AttributeError: 'NoneType' object has no attribute 'setCallSite'

In [97]:
movieRecs = model.recommendForAllItems(10)

In [98]:
movieRecs.take(3)

AttributeError: 'NoneType' object has no attribute 'setCallSite'