In [None]:
!pip install nltk

In [1]:
import string
import time
from nltk.corpus import stopwords

import numpy as np
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.mllib.linalg import Vectors as MLlibVectors
from pyspark.mllib.random import RandomRDDs

In [2]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/mobod2021/mob2021031/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [3]:
def read_txt(data_path, min_token_tf, max_token_tf, min_token_length, min_doc_length=50, is_vw_format=False):
    time_start = time.time()
    sqlContext = SQLContext(sc)

    def parse_file(kv):
        line = kv if is_vw_format else kv[1]
        line = line.replace('\n', ' ').replace('\t', ' ')
        
        if is_vw_format:
            line_list = []
            for token in line.split(' ')[1: ]:
                lst = token.split(':')
                if len(lst) == 1:
                    line_list.append(token)
                else:
                    line_list += [lst[0]] * int(float(lst[1]))
            line = ' '.join(line_list)
        
        for p in string.punctuation:
            line = line.replace(p, ' ')
        
        tokens = [e.strip().lower() for e in line.strip().split(' ') if len(e) > 0]
        if is_vw_format:
            return tokens
        else:
            return (kv[0], tokens)


    def filter_token(kv):
        token = kv[0]
        value = kv[1]

        if value > max_token_tf or value < min_token_tf:
            return False

        if len(token) < min_token_length:
            return False

        if token in stopwords_:
            return False

        for i in '0123456789':
            if i in token:
                return False

        return True


    def get_tokens(tokens):
        if is_vw_format:
            return tokens
        return tokens[1]


    def parseVectors(line):
        return [int(line[2]), line[0]]


    if is_vw_format:
        dataset = sc.textFile(data_path)
    else:
        dataset = sc.wholeTextFiles("{}/*".format(data_path))
    dataset = dataset.map(parse_file)
    
    word_counts = (dataset
                   .flatMap(lambda path_with_tokens: ((token, 1) for token in get_tokens(path_with_tokens)))
                   .reduceByKey(lambda cnt_1, cnt_2: cnt_1 + cnt_2)
                   .sortBy(lambda token_with_cnt: -token_with_cnt[1]))

    stopwords_ = set(stopwords.words('english'))

    word_counts = word_counts.filter(filter_token)
    vocab = set([e[0] for e in word_counts.collect()])

    print('Total number of tokens: {0}'.format(len(vocab)))
    
    i = 0
    if is_vw_format:
        dataset = (dataset
                   .map(lambda kv: (i, list(filter(lambda t: t in vocab, kv))))
                   .filter(lambda kv: len(kv[1]) > min_doc_length))
    else:
        dataset = (dataset
                   .map(lambda kv: (kv[0].split('/')[-1], list(filter(lambda t: t in vocab, kv[1]))))
                   .filter(lambda kv: len(kv[1]) > min_doc_length))
    
    print('Total number of documents: {}'.format(dataset.count()))
    
    data_df = sqlContext.createDataFrame(dataset, ['id', 'tokens'])
    
    data_df = data_df.drop("id")
    
    data_df = data_df.withColumn("id", monotonically_increasing_id())
    
#     data_df.show()
    
    cv = CountVectorizer(inputCol="tokens", outputCol="vectors")
    cv_model = cv.fit(data_df)
    df_vect = cv_model.transform(data_df)

    bow = (df_vect
           .select('vectors', 'tokens', 'id')
           .rdd.map(parseVectors)
           .mapValues(MLlibVectors.fromML)
           .map(list))
    
    nnz = sum(bow.map(lambda x: list(x[1].values)).reduce(lambda x, y: x + y))
    print('Total collection size: {}'.format(nnz))

    print('Elapsed time : {} sec.'.format(int(time.time() - time_start)))
    return bow, cv_model, nnz

In [330]:
class TopicModel:
    
    def __init__(self, num_topics, cv_model, nnz, num_document_passes, use_phi_broadcast=True, beta=0.0):
        self.num_topics=num_topics      # число тем в модели
        self.cv_model_vocabulary=cv_model.vocabulary          # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
        self.nnz=nnz                    # общее число словопозиций в коллекции
        self.num_document_passes=num_document_passes     # число проходов по документу на E-шаге
        self.use_phi_broadcast=use_phi_broadcast     # использование бродкастинга матрицы $\Phi$
        self.beta=beta
        self.perplexity_list=[]
        
        phiwt_np = np.random.random((len(self.cv_model_vocabulary), self.num_topics))
        
        if self.use_phi_broadcast:
            self.phiwt = sc.broadcast(phiwt_np)
        else:
            self.phiwt = phiwt_np
                
    def fit(self, bow_data, num_collection_passes=10):

        self.perplexity_list = []
        time_start = time.time()
        for _ in range(num_collection_passes):
            
            def process_document(documents, 
                                 num_topics=self.num_topics, 
                                 num_document_passes=self.num_document_passes,
                                 phiwt=self.phiwt, 
                                 use_phi_broadcast=self.use_phi_broadcast):
                
                theta = np.array([1/num_topics] * num_topics)
                n_dw = np.zeros(documents.size)
                n_dw[documents.indices] = documents.values
                
                phi = phiwt.value if use_phi_broadcast else phiwt
                
                for _ in range(num_document_passes):
                    _ptdw = np.einsum("wt,t->tw", phi, theta)
                    ptdw = _ptdw / np.sum(_ptdw, axis=0, keepdims=True)
                    
                    _theta = np.einsum("t,wt->w", n_dw, ptdw)
                    theta = _theta / np.sum(_theta, axis=0, keepdims=True)
                
                n_wt = np.einsum("w,tw->wt", n_dw, ptdw)
                
                return n_wt, (n_dw * np.log((phi * theta).sum(axis=1))).sum()
        
            def E_part(rows):
                for row in rows:
                    _, documents = row
                    n_wt, perplexity = process_document(documents)
                    yield n_wt, perplexity

            E = bow_data.partitionBy(5).mapPartitions(E_part)

            n_wt, perplexity = E.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))

            phi = self.phiwt.value if self.use_phi_broadcast else self.phiwt
            
            phiwt_tmp = np.maximum(n_wt + 0.1*phi + self.beta, 0)
            phiwt  = phiwt_tmp / phiwt_tmp.sum(axis=0, keepdims=True)

            if self.use_phi_broadcast:
                self.phiwt = sc.broadcast(phiwt)
            else:
                self.phiwt = phiwt

            self.perplexity_list.append(np.exp(-perplexity/ self.nnz))
        
        print("Done")
        print('Elapsed time : {} sec.'.format(int(time.time() - time_start)))
#         with open("time.txt", 'a') as f:
#             f.write("{} {}\n".format(self.num_topics, int(time.time() - time_start)))
        
    def print_perplexity(self):
        print(self.perplexity_list)
    
    def print_topics(self, num_tokens=10):
        indcs = self.PHIwt.argsort(axis=0)[-num_tokens:][::-1]
        for idx, i_w in enumerate(range(indcs.shape[1])):
            print("topic_id: {}".format(idx))
            for i in indcs[:,i_w]:
                print(model.cv_model_vocabulary[i])
            print("--------")

In [5]:
# ssc = StreamingContext(sc, 1)
# conf = SparkConf()
# conf.set("spark.executor.memory", "10g")
# conf.set("spark.driver.memory", "8g")
# conf.set("spark.core.connection.ack.wait.timeout", "1200")
# # conf.set("spark.executor.heartbeatInterval", "10s")
# conf.set("spark.executor.heartbeatInterval","3600s")

# SparkContext.setSystemProperty('spark.executor.memory', '2g')
# SparkContext.setSystemProperty('spark.driver.memory', '2g')
# sc = SparkContext("local", "App")
sc = SparkContext.getOrCreate();
# sc.getConf().getAll()

In [6]:
data_path = "/data/mobod/tm/vw.wiki-en-20K.txt" # /data/mobod/tm/vw.wiki-en-20K.txt
min_token_tf = 10
max_token_tf = 30000
min_token_length = 3

In [7]:
bow, cv_model, nnz = read_txt(data_path, min_token_tf, max_token_tf, min_token_length, min_doc_length=50, is_vw_format=True)

Total number of tokens: 42143
Total number of documents: 16029
Total collection size: 5193241.0
Elapsed time : 90 sec.


Перед проведением экспериментов, проверим, что данные разбиты на достаточное число партиций и что среди них нет вырожденных (существенно меньших по объёму, чем прочие). 

In [8]:
def count_row_in_partitions(rows):
    count = 0
    for _ in rows:
        count += 1
    yield count     

In [9]:
%%time
bow.mapPartitions(count_row_in_partitions).collect()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 6.26 s


[8038, 7991]

In [10]:
%%time
bow.partitionBy(5).mapPartitions(count_row_in_partitions).collect()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 5.06 s


[3206, 3206, 3207, 3205, 3205]

Оценить время работы при num_document_passes=5 и num_collection_passes=10, без broadcast переменной,  с числом тем num_topics = 10, 20, 50

In [11]:
num_topics_list = [10, 20, 50]

In [12]:
for num_topics in num_topics_list:
    print('Num topics: ', num_topics)
    model = TopicModel(num_topics=num_topics,   # число тем в модели
                   cv_model=cv_model,           # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                     # общее число словопозиций в коллекции
                   num_document_passes=10,      # число проходов по документу на E-шаге
                   use_phi_broadcast=False,     # использование бродкастинга матрицы $\Phi$
                   beta=0.0)  
    model.fit1(bow, num_collection_passes=10) 

Num topics:  10
Done
Elapsed time : 4559 sec.
Num topics:  20
Done
Elapsed time : 9360 sec.
Num topics:  50
Done
Elapsed time : 18573 sec.


Оценить время работы при num_document_passes=5 и num_collection_passes=10, с broadcast переменной,  с числом тем num_topics = 10, 20, 50

In [13]:
for num_topics in num_topics_list:
    print('Num topics: ', num_topics)
    model = TopicModel(num_topics=num_topics,  # число тем в модели
                   cv_model=cv_model,          # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                    # общее число словопозиций в коллекции
                   num_document_passes=10,     # число проходов по документу на E-шаге
                   use_phi_broadcast=True,     # использование бродкастинга матрицы $\Phi$
                   beta=0.0)  
    model.fit1(bow, num_collection_passes=10) 

Num topics:  10
Done
Elapsed time : 3970 sec.
Num topics:  20
Done
Elapsed time : 8107 sec.
Num topics:  50
Done
Elapsed time : 18398 sec.


При `num_topics=20` и `num_collection_passes=10` попробовать различные значения `num_document_passes` = 1, 2, 5, 10

In [37]:
model = TopicModel(num_topics=20, # число тем в модели
                   cv_model=cv_model,           # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                     # общее число словопозиций в коллекции
                   num_document_passes=1,      # число проходов по документу на E-шаге
                   use_phi_broadcast=False,   # использование бродкастинга матрицы $\Phi$
                   beta=0.0)   

In [38]:
%%time
model.fit(bow, num_collection_passes=10)

Done
Elapsed time : 745 sec.
CPU times: user 1.13 s, sys: 316 ms, total: 1.44 s
Wall time: 12min 25s


In [39]:
model.print_topics(10)

topic_id: 0
town
party
game
union
king
cup
station
club
art
band
--------
topic_id: 1
river
district
system
church
film
game
album
road
army
version
--------
topic_id: 2
film
song
london
party
album
king
road
building
support
community
--------
topic_id: 3
park
army
album
man
political
award
black
book
research
street
--------
topic_id: 4
film
club
album
record
king
radio
station
works
next
party
--------
topic_id: 5
film
party
river
song
church
game
station
art
building
education
--------
topic_id: 6
party
song
church
town
building
station
road
right
river
full
--------
topic_id: 7
game
club
development
league
park
william
book
system
cup
next
--------
topic_id: 8
game
film
league
party
road
river
album
band
king
club
--------
topic_id: 9
king
system
center
album
army
league
club
england
women
games
--------
topic_id: 10
film
game
league
father
station
album
children
party
night
show
--------
topic_id: 11
film
church
party
club
album
london
king
england
women
power
--------
topic_id: 

In [40]:
model.print_perplexity()

[2.0031400293603694, 11590.876243136421, 11589.343341274445, 11589.343320963952, 11589.343320691012, 11589.343320685926, 11589.343320685866, 11589.343320685866, 11589.343320685866, 11589.343320685988]


In [41]:
model = TopicModel(num_topics=20, # число тем в модели
                   cv_model=cv_model,           # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                     # общее число словопозиций в коллекции
                   num_document_passes=2,      # число проходов по документу на E-шаге
                   use_phi_broadcast=False,   # использование бродкастинга матрицы $\Phi$
                   beta=0.0)   

In [42]:
%%time
model.fit(bow, num_collection_passes=10)

Done
Elapsed time : 1059 sec.
CPU times: user 1.07 s, sys: 308 ms, total: 1.38 s
Wall time: 17min 39s


In [43]:
model.print_topics(10)

topic_id: 0
film
production
center
church
research
system
cause
museum
education
file
--------
topic_id: 1
party
castle
chateau
parties
region
river
building
england
league
democratic
--------
topic_id: 2
league
cup
album
club
football
guitar
vocals
track
games
video
--------
topic_id: 3
station
research
london
book
railway
students
royal
center
council
established
--------
topic_id: 4
king
emperor
china
province
empire
roman
site
dehydrogenase
period
william
--------
topic_id: 5
game
station
version
london
right
published
air
film
church
control
--------
topic_id: 6
game
air
book
station
engine
using
island
even
character
film
--------
topic_id: 7
regiment
foot
raised
film
bus
park
battalion
river
division
championship
--------
topic_id: 8
model
species
town
water
support
power
system
given
similar
units
--------
topic_id: 9
song
chart
championship
episode
club
film
version
singles
play
points
--------
topic_id: 10
league
club
album
player
band
goals
game
film
football
show
--------
t

In [44]:
model.print_perplexity()

[2.002166420435021, 11525.038954980118, 11488.647874276287, 11432.740235584848, 11354.367263654498, 11254.982155642618, 11133.267272163981, 10982.580921302015, 10788.087364622208, 10529.36571757355]


In [45]:
model = TopicModel(num_topics=20, # число тем в модели
                   cv_model=cv_model,           # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                     # общее число словопозиций в коллекции
                   num_document_passes=5,      # число проходов по документу на E-шаге
                   use_phi_broadcast=False,   # использование бродкастинга матрицы $\Phi$
                   beta=0.0)   

In [46]:
%%time
model.fit(bow, num_collection_passes=10)

Done
Elapsed time : 2143 sec.
CPU times: user 1.18 s, sys: 352 ms, total: 1.53 s
Wall time: 35min 43s


In [47]:
model.print_topics(10)

topic_id: 0
royal
army
regiment
foot
infantry
battalion
bgcolor
men
ship
corps
--------
topic_id: 1
album
song
band
records
songs
chart
king
guitar
love
track
--------
topic_id: 2
show
film
star
big
radio
award
red
man
little
video
--------
topic_id: 3
championship
race
points
round
racing
win
men
event
tournament
champion
--------
topic_id: 4
law
president
political
court
king
act
french
social
minister
rights
--------
topic_id: 5
league
club
game
cup
students
games
education
player
goals
football
--------
topic_id: 6
church
linear
socorro
william
isbn
peak
james
opera
kitt
spacewatch
--------
topic_id: 7
saint
aircraft
air
german
africa
population
france
squadron
language
airport
--------
topic_id: 8
film
father
award
daughter
mother
married
sir
role
children
london
--------
topic_id: 9
league
division
football
town
club
stadium
game
district
baseball
england
--------
topic_id: 10
chinese
system
china
support
version
web
memory
mobile
food
windows
--------
topic_id: 11
river
water
tr

In [48]:
model.print_perplexity()

[1.9810176955803374, 11259.338319260123, 10871.226215878696, 10136.359941594508, 8984.999570865597, 7742.519186072006, 6843.55439305915, 6304.399992353714, 5975.885641456235, 5755.43352870641]


In [25]:
model = TopicModel(num_topics=20, # число тем в модели
                   cv_model=cv_model,           # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                     # общее число словопозиций в коллекции
                   num_document_passes=10,      # число проходов по документу на E-шаге
                   use_phi_broadcast=False,   # использование бродкастинга матрицы $\Phi$
                   beta=0.0)   

In [31]:
%%time
model.fit(bow, num_collection_passes=10)

Done
Elapsed time : 3019 sec.
CPU times: user 1.31 s, sys: 320 ms, total: 1.63 s
Wall time: 50min 19s


In [35]:
model.print_topics(10)

topic_id: 0
million
development
economic
oil
business
power
companies
production
services
health
--------
topic_id: 1
law
president
party
court
election
saint
canada
elected
canadian
council
--------
topic_id: 2
league
championship
football
game
stadium
games
cup
win
coach
player
--------
topic_id: 3
party
church
political
social
movement
christian
god
book
women
religious
--------
topic_id: 4
station
japan
japanese
opera
railway
episode
stations
tokyo
network
channel
--------
topic_id: 5
river
town
building
park
village
jpg
church
lake
site
street
--------
topic_id: 6
album
band
song
records
songs
chart
guitar
video
rock
track
--------
topic_id: 7
system
using
model
type
design
engine
systems
standard
available
car
--------
topic_id: 8
league
cup
club
emperor
division
district
goals
cricket
total
england
--------
topic_id: 9
london
william
book
published
isbn
george
art
married
james
son
--------
topic_id: 10
german
germany
russian
republic
soviet
european
russia
van
french
dutch
----

In [36]:
model.print_perplexity()

[1.9539752532315, 10788.128738534579, 9658.761839116385, 8109.908301100955, 6926.145901190617, 6275.731027385971, 5907.875953679706, 5676.900583304337, 5517.366150706584, 5399.501576395475, 5308.210064133085, 5235.350622086657, 5176.46696157842, 5128.810649762238]


## ===================================

Простейшая регуляризация LDA

beta_list = [0.0, -0.1, -1.0]

Beta: 0.0

In [257]:
model = TopicModel(num_topics=20,          # число тем в модели
                   cv_model=cv_model,          # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                    # общее число словопозиций в коллекции
                   num_document_passes=5,      # число проходов по документу на E-шаге
                   use_phi_broadcast=True,     # использование бродкастинга матрицы $\Phi$
                   beta=0.0)   

In [259]:
model.fit(bow, num_collection_passes=10)

Done
Elapsed time : 1744 sec.


In [260]:
model.print_perplexity()

[6374.922999087668, 5971.94906111582, 5703.743190814501, 5520.280327351867, 5390.4614142650835, 5294.094546009813, 5220.546188067603, 5163.953001849265, 5119.300763980135, 5083.667488426292]


Посчитаем разряженность матрицы

In [262]:
np.count_nonzero(np.round(model.phiwt.value, decimals=4)==0.0) / (model.phiwt.value.shape[0] * model.phiwt.value.shape[1])

0.9252206824545584

Beta: -0.1

In [331]:
model = TopicModel(num_topics=20,          # число тем в модели
                   cv_model=cv_model,          # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                    # общее число словопозиций в коллекции
                   num_document_passes=5,      # число проходов по документу на E-шаге
                   use_phi_broadcast=True,     # использование бродкастинга матрицы $\Phi$
                   beta=-0.1)   

In [332]:
model.fit(bow, num_collection_passes=10)

Done
Elapsed time : 1754 sec.


In [333]:
model.print_perplexity()

[1.980021687603568, nan, nan, nan, nan, nan, nan, nan, nan, nan]


In [334]:
np.count_nonzero(np.round(model.phiwt.value, decimals=4)==0.0) / (model.phiwt.value.shape[0] * model.phiwt.value.shape[1])

0.0

Beta: -1.0

In [None]:
model = TopicModel(num_topics=20,          # число тем в модели
                   cv_model=cv_model,          # векторизатор, объект класса `pyspark.ml.feature.CountVectorizer`
                   nnz=nnz,                    # общее число словопозиций в коллекции
                   num_document_passes=5,      # число проходов по документу на E-шаге
                   use_phi_broadcast=True,     # использование бродкастинга матрицы $\Phi$
                   beta=-1.0)   

In [None]:
model.fit(bow, num_collection_passes=10)

In [None]:
model.print_perplexity()

In [None]:
np.count_nonzero(np.round(model.phiwt.value, decimals=4)==0.0) / (model.phiwt.value.shape[0] * model.phiwt.value.shape[1])

In [178]:
model.print_perplexity()

[9513.324257630837, 7771.021776248306, 6498.5023868671715, 5854.101607647668, 5510.384859919405, 5306.701951431949, 5176.884582509909, 5090.030398120509, 5029.170717168109, 4985.404751733243]


In [179]:
model.print_topics(5)

topic_id: 0
father
son
church
mother
said
--------
topic_id: 1
game
road
construction
airport
building
--------
topic_id: 2
research
education
india
students
social
--------
topic_id: 3
russian
hong
chinese
japanese
jewish
--------
topic_id: 4
party
law
political
election
court
--------
topic_id: 5
spanish
miss
del
men
san
--------
topic_id: 6
art
book
isbn
published
works
--------
topic_id: 7
california
president
news
http
texas
--------
topic_id: 8
station
linear
socorro
railway
town
--------
topic_id: 9
system
systems
data
using
process
--------
topic_id: 10
film
television
award
films
role
--------
topic_id: 11
river
king
species
water
lake
--------
topic_id: 12
league
club
cup
football
round
--------
topic_id: 13
army
battle
military
canadian
canada
--------
topic_id: 14
episode
character
show
story
man
--------
topic_id: 15
theory
form
space
example
case
--------
topic_id: 16
church
park
street
building
road
--------
topic_id: 17
album
band
song
records
songs
--------
topic_id: 1