In [1]:
import gensim
import logging
import pickle
import random
import scipy.sparse
import sys

import numpy as np

from pyspark import SparkContext
from sklearn.decomposition import LatentDirichletAllocation

In [2]:
def remove_zero_rows(M):
    '''Function that removes all rows from sparse matrix M that contains only zero.'''
    num_nonzeros = np.diff(M.indptr)
    return M[num_nonzeros != 0]

In [3]:
# Load the full sparse matrix
S = scipy.sparse.load_npz('/dlabdata1/youtube_large/olam/matrices/S_full.npz')

In [4]:
S = remove_zero_rows(S)

In [5]:
# Load dictionnary of tokens
with open('/dlabdata1/youtube_large/olam/list_stemmed_tokens.pickle', 'rb') as f:
    list_stemmed_tokens = pickle.load(f)
f.close()

In [6]:
# Load the set of videos to be considered
with open('/dlabdata1/youtube_large/olam/idx_vid_to_consider.pickle', 'rb') as f:
    idx_vid_to_consider = pickle.load(f)
f.close()

In [7]:
oldidx2newidx = {}

In [8]:
for i, idx_vid in enumerate(idx_vid_to_consider):
    oldidx2newidx[idx_vid] = i

## Prep-process 

### A) Remove non-frequent tokens

In [9]:
# Convert to csc in order to fast check the number of non zero element in each column 
S = S.tocsc()

In [10]:
id_tokens_to_consider = []

In [11]:
# Iterate on the columns
for i in range(S.shape[1]):
    
    if i%1000000 == 0:
        print('Processed : ' + str(i) + ' tokens')
        
    # Check column has more than 20 non zero entries
    if S[:,i].count_nonzero() >= 20:
        id_tokens_to_consider.append(i)

Processed : 0 videos
Processed : 1000000 videos
Processed : 2000000 videos
Processed : 3000000 videos
Processed : 4000000 videos
Processed : 5000000 videos
Processed : 6000000 videos
Processed : 7000000 videos


In [12]:
len(id_tokens_to_consider)

744141

In [13]:
S = S[:,id_tokens_to_consider]

In [14]:
oldtokenid2newtokenid = {}

In [15]:
# Get old token id from the new token id
for i, old_id in enumerate(id_tokens_to_consider):
    oldtokenid2newtokenid[old_id] = i

In [16]:
set_oldid_token_to_consider = set(oldtokenid2newtokenid.keys())

In [17]:
# Get token for each column index
id2word = {}

for i, token in enumerate(list_stemmed_tokens):
    
    if i in set_oldid_token_to_consider:
        id2word[oldtokenid2newtokenid[i]] = token

In [18]:
with open('/dlabdata1/youtube_large/olam/id2word.pickle', 'wb') as f:
    pickle.dump(id2word, f)
f.close()

In [21]:
# Convert back to csr since there is 100 times more rows than columns! (Memory efficient)
S = S.tocsr()

In [22]:
S

<68638982x744141 sparse matrix of type '<class 'numpy.uint8'>'
	with 1393937498 stored elements in Compressed Sparse Row format>

In [23]:
scipy.sparse.save_npz('/dlabdata1/youtube_large/olam/matrices/S_final.npz', S)

In [27]:
S = S.transpose()

In [28]:
corpus = gensim.matutils.Sparse2Corpus(S)

In [21]:
corpus

<gensim.matutils.Sparse2Corpus at 0x7f8732c8f890>

In [22]:
# Set up log to terminal
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

In [24]:
lda_model = gensim.models.LdaMulticore(corpus=corpus,
                                       num_topics=20,
                                       id2word=id2word, 
                                       workers=3,
                                       chunksize=100,
                                       passes=10,
                                       per_word_topics=True)

2020-10-02 17:59:34,021 : INFO : using symmetric alpha at 0.05
2020-10-02 17:59:34,023 : INFO : using symmetric eta at 0.05
2020-10-02 17:59:34,133 : INFO : using serial LDA version on this node
2020-10-02 17:59:35,852 : INFO : running online LDA training, 20 topics, 10 passes over the supplied corpus of 68638982 documents, updating every 300 documents, evaluating every ~3000 documents, iterating 50x with a convergence threshold of 0.001000
2020-10-02 17:59:35,856 : INFO : training LDA model using 3 processes
2020-10-02 17:59:36,465 : INFO : PROGRESS: pass 0, dispatched chunk #0 = documents up to #100/68638982, outstanding queue size 1
2020-10-02 17:59:37,790 : INFO : PROGRESS: pass 0, dispatched chunk #1 = documents up to #200/68638982, outstanding queue size 2
2020-10-02 17:59:37,793 : INFO : PROGRESS: pass 0, dispatched chunk #2 = documents up to #300/68638982, outstanding queue size 3
2020-10-02 17:59:37,797 : INFO : PROGRESS: pass 0, dispatched chunk #3 = documents up to #400/6863

  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamulticore.py", line 337, in worker_e_step
    worker_lda.do_estep(chunk)  # TODO: auto-tune alpha?
2020-10-02 18:00:06,128 : INFO : PROGRESS: pass 0, dispatched chunk #19 = documents up to #2000/68638982, outstanding queue size 20
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 742, in do_estep
    gamma, sstats = self.inference(chunk, collect_sstats=True)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 680, in inference
    expElogbetad = self.expElogbeta[:, ids]
IndexError: index 753457 is out of bounds for axis 1 with size 744141
Process ForkPoolWorker-25:
2020-10-02 18:00:06,376 : INFO : PROGRESS: pass 0, dispatched chunk #20 = documents up to #2100/68638982, outstanding queue size 21
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
2020-10-02 18

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
2020-10-02 18:00:38,541 : INFO : PROGRESS: pass 0, dispatched chunk #33 = documents up to #3400/68638982, outstanding queue size 34
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 105, in worker
    initializer(*initargs)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamulticore.py", line 337, in worker_e_step
    worker_lda.do_estep(chunk)  # TODO: auto-tune alpha?
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 742, in do_estep
    gamma, sstats = self.inference(chunk, collect_sstats=True)
2020-10-02 18:00:40,706 : INFO : PROGRESS: pass 0, dispatched chunk #34 = documents up to #3500/68638982, outstanding queue size 35
  File "/home/olam/.l

2020-10-02 18:01:12,719 : INFO : PROGRESS: pass 0, dispatched chunk #47 = documents up to #4800/68638982, outstanding queue size 48
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 680, in inference
    expElogbetad = self.expElogbeta[:, ids]
IndexError: index 963665 is out of bounds for axis 1 with size 744141
2020-10-02 18:01:14,077 : INFO : PROGRESS: pass 0, dispatched chunk #48 = documents up to #4900/68638982, outstanding queue size 49
Process ForkPoolWorker-38:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
2020-10-02 18:01:15,251 : INFO : PROGRESS: pass 0, dispatched chunk #49 = documents up to #5000/68638982, outstanding queue size 50
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 105, in worker
    ini

KeyboardInterrupt: 

  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 105, in worker
    initializer(*initargs)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamulticore.py", line 337, in worker_e_step
    worker_lda.do_estep(chunk)  # TODO: auto-tune alpha?
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 742, in do_estep
    gamma, sstats = self.inference(chunk, collect_sstats=True)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 680, in inference
    expElogbetad = self.expElogbeta[:, ids]
IndexError: index 967280 is out of bounds for axis 1 with size 744141
Process ForkPoolWorker-45:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/mu

  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 742, in do_estep
    gamma, sstats = self.inference(chunk, collect_sstats=True)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 680, in inference
    expElogbetad = self.expElogbeta[:, ids]
IndexError: index 773997 is out of bounds for axis 1 with size 744141
Process ForkPoolWorker-53:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 105, in worker
    initializer(*initargs)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamulticore.py", line 337, in worker_e_step
    worker_lda.do_estep(chunk)  # TODO: auto-tune alpha?
  File "/home/olam/.local/lib/python3.7

  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 105, in worker
    initializer(*initargs)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamulticore.py", line 337, in worker_e_step
    worker_lda.do_estep(chunk)  # TODO: auto-tune alpha?
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 742, in do_estep
    gamma, sstats = self.inference(chunk, collect_sstats=True)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 680, in inference
    expElogbetad = self.expElogbeta[:, ids]
IndexError: index 1115756 is out of bounds for axis 1 with size 744141
Process ForkPoolWorker-62:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/m

  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 680, in inference
    expElogbetad = self.expElogbeta[:, ids]
IndexError: index 832224 is out of bounds for axis 1 with size 744141
Process ForkPoolWorker-70:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 105, in worker
    initializer(*initargs)
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamulticore.py", line 337, in worker_e_step
    worker_lda.do_estep(chunk)  # TODO: auto-tune alpha?
  File "/home/olam/.local/lib/python3.7/site-packages/gensim/models/ldamodel.py", line 742, in do_estep
    gamma, sstats = self.inference(chunk, collect_sstats=True)
  File "/home/olam/.local/lib/python3.7

---

# Spark Implementation

In [4]:
sc = SparkContext('local', 'lda')

In [6]:
dok_test = scipy.sparse.dok_matrix((10, 50))

In [8]:
for i in range(10):
    for j in range(50):
        dok_test[i, j] = random.randint(0, 2)

In [10]:
csr_test = dok_test.tocsr()

In [16]:
rdd = sc.parallelize(csr_test.toarray())

In [18]:
rdd.collect()

[array([1., 1., 2., 2., 0., 2., 2., 1., 1., 0., 1., 0., 0., 1., 0., 0., 1.,
        0., 2., 0., 1., 2., 1., 1., 1., 2., 1., 0., 1., 2., 2., 0., 2., 1.,
        2., 1., 2., 2., 0., 0., 1., 0., 2., 0., 1., 0., 0., 1., 0., 1.]),
 array([2., 1., 2., 2., 0., 1., 2., 1., 0., 2., 0., 0., 1., 1., 1., 0., 2.,
        1., 1., 1., 1., 2., 2., 1., 2., 0., 0., 2., 0., 1., 1., 1., 0., 2.,
        2., 0., 1., 1., 0., 2., 1., 0., 1., 1., 0., 0., 0., 0., 0., 0.]),
 array([1., 1., 0., 1., 1., 2., 1., 1., 0., 2., 1., 1., 2., 0., 2., 1., 0.,
        0., 1., 2., 0., 0., 2., 0., 0., 2., 0., 1., 0., 2., 1., 1., 0., 1.,
        1., 2., 0., 0., 0., 1., 0., 2., 0., 1., 1., 2., 2., 2., 0., 1.]),
 array([0., 0., 1., 2., 0., 2., 1., 2., 1., 1., 2., 2., 1., 0., 1., 2., 2.,
        0., 0., 1., 0., 2., 2., 2., 2., 0., 1., 1., 2., 1., 0., 0., 2., 1.,
        2., 2., 1., 2., 0., 2., 1., 0., 2., 2., 0., 2., 2., 1., 2., 2.]),
 array([0., 0., 2., 2., 1., 0., 0., 1., 1., 1., 1., 1., 0., 0., 2., 0., 0.,
        1., 1., 1., 

In [65]:
rdd1 = sc.parallelize(csr_test[:3,:].toarray())

In [66]:
rdd2 = sc.parallelize(csr_test[3:6,:].toarray())

In [67]:
rdd3 = sc.parallelize(csr_test[6:,:].toarray())

In [68]:
union12 = rdd1.union(rdd2)

In [69]:
union12.collect()

[array([1., 1., 2., 2., 0., 2., 2., 1., 1., 0., 1., 0., 0., 1., 0., 0., 1.,
        0., 2., 0., 1., 2., 1., 1., 1., 2., 1., 0., 1., 2., 2., 0., 2., 1.,
        2., 1., 2., 2., 0., 0., 1., 0., 2., 0., 1., 0., 0., 1., 0., 1.]),
 array([2., 1., 2., 2., 0., 1., 2., 1., 0., 2., 0., 0., 1., 1., 1., 0., 2.,
        1., 1., 1., 1., 2., 2., 1., 2., 0., 0., 2., 0., 1., 1., 1., 0., 2.,
        2., 0., 1., 1., 0., 2., 1., 0., 1., 1., 0., 0., 0., 0., 0., 0.]),
 array([1., 1., 0., 1., 1., 2., 1., 1., 0., 2., 1., 1., 2., 0., 2., 1., 0.,
        0., 1., 2., 0., 0., 2., 0., 0., 2., 0., 1., 0., 2., 1., 1., 0., 1.,
        1., 2., 0., 0., 0., 1., 0., 2., 0., 1., 1., 2., 2., 2., 0., 1.]),
 array([0., 0., 1., 2., 0., 2., 1., 2., 1., 1., 2., 2., 1., 0., 1., 2., 2.,
        0., 0., 1., 0., 2., 2., 2., 2., 0., 1., 1., 2., 1., 0., 0., 2., 1.,
        2., 2., 1., 2., 0., 2., 1., 0., 2., 2., 0., 2., 2., 1., 2., 2.]),
 array([0., 0., 2., 2., 1., 0., 0., 1., 1., 1., 1., 1., 0., 0., 2., 0., 0.,
        1., 1., 1., 

In [70]:
union123 = union12.union(rdd3)

In [71]:
union123.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 19, 192.168.1.4, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


## SkLearn implementation

In [2]:
S = scipy.sparse.load_npz('/dlabdata1/youtube_large/olam/matrices/S_final.npz')

In [3]:
S

<68638982x744141 sparse matrix of type '<class 'numpy.uint8'>'
	with 1393937498 stored elements in Compressed Sparse Row format>

In [6]:
lda = LatentDirichletAllocation(n_components=20,
                                batch_size=128,
                                n_jobs=4,
                                verbose=1,
                                random_state=0)

In [7]:
lda.fit(S)

error: 'i' format requires -2147483648 <= number <= 2147483647

In [13]:
lda.components_ / lda.components_.sum(axis=1)[:, np.newaxis]

array([[3.89431457e-10, 3.89431456e-10, 1.93689538e-08, ...,
        3.89431456e-10, 3.89431456e-10, 1.10061402e-09],
       [1.69037910e-07, 4.11797598e-10, 4.11797598e-10, ...,
        4.11797598e-10, 4.11797598e-10, 5.82636610e-10],
       [2.76393768e-10, 2.76393765e-10, 2.76393766e-10, ...,
        1.55702996e-09, 4.27726029e-09, 2.76394169e-10],
       ...,
       [4.30232373e-10, 4.30232373e-10, 5.91031833e-09, ...,
        4.30232377e-10, 4.30232373e-10, 4.36399047e-10],
       [6.16629379e-08, 3.16846687e-10, 2.20204666e-08, ...,
        3.16846687e-10, 3.16846687e-10, 3.16846872e-10],
       [1.30087122e-09, 5.29652383e-10, 5.29652384e-10, ...,
        5.32241178e-10, 8.39788292e-07, 5.29652400e-10]])

In [16]:
lda.doc_topic_prior_

0.05

In [17]:
lda.topic_word_prior_

0.05

In [18]:
with open('/dlabdata1/youtube_large/olam/lda_model_3iter.pickle', 'wb') as f:
    pickle.dump(lda, f)
f.close()

In [19]:
with open('/dlabdata1/youtube_large/olam/lda_model_3iter.pickle', 'rb') as f:
    lda_test = pickle.load(f)
f.close()

In [21]:
lda_test.components_ / lda_test.components_.sum(axis=1)[:, np.newaxis]

array([[3.89431457e-10, 3.89431456e-10, 1.93689538e-08, ...,
        3.89431456e-10, 3.89431456e-10, 1.10061402e-09],
       [1.69037910e-07, 4.11797598e-10, 4.11797598e-10, ...,
        4.11797598e-10, 4.11797598e-10, 5.82636610e-10],
       [2.76393768e-10, 2.76393765e-10, 2.76393766e-10, ...,
        1.55702996e-09, 4.27726029e-09, 2.76394169e-10],
       ...,
       [4.30232373e-10, 4.30232373e-10, 5.91031833e-09, ...,
        4.30232377e-10, 4.30232373e-10, 4.36399047e-10],
       [6.16629379e-08, 3.16846687e-10, 2.20204666e-08, ...,
        3.16846687e-10, 3.16846687e-10, 3.16846872e-10],
       [1.30087122e-09, 5.29652383e-10, 5.29652384e-10, ...,
        5.32241178e-10, 8.39788292e-07, 5.29652400e-10]])