In [1]:
import re
import operator
from collections import Counter
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '24g')
SparkContext.setSystemProperty('spark.executor.extraJavaOptions', '-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps')
sc = SparkContext("local[*]", 'dcard')

In [2]:
from pyspark.sql import SparkSession
import sys
my_spark = SparkSession \
    .builder \
    .appName("dcard") \
    .config("spark.mongodb.input.uri", "mongodb://192.168.2.12:27017/dcard.talk_posts") \
    .config("spark.mongodb.output.uri", "mongodb://192.168.2.12:27017/dcard.talk_posts") \
    .getOrCreate()

In [3]:
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [4]:
from pyspark.accumulators import AccumulatorParam

In [5]:
df.printSchema()

root
 |-- _id: integer (nullable = true)
 |-- anonymousDepartment: boolean (nullable = true)
 |-- anonymousSchool: boolean (nullable = true)
 |-- commentCount: integer (nullable = true)
 |-- content: string (nullable = true)
 |-- createdAt: string (nullable = true)
 |-- department: string (nullable = true)
 |-- excerpt: string (nullable = true)
 |-- forumAlias: string (nullable = true)
 |-- forumId: string (nullable = true)
 |-- forumName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hidden: boolean (nullable = true)
 |-- hiddenByAuthor: boolean (nullable = true)
 |-- likeCount: integer (nullable = true)
 |-- media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: string (nullable = true)
 |-- pinned: boolean (nullable = true)
 |-- replyId: integer (nullable = true)
 |-- replyTitle: string (nullable = true)
 |-- reportReason: string (nullable = true)
 |-- school: string (nullable = true)
 |-- tags: array (nullable = true

In [6]:
df.groupBy("school").count().show()

+-----------+-----+
|     school|count|
+-----------+-----+
|       新堡大學|    9|
| 國立高雄海洋科技大學|   42|
|        屏科大|  446|
|      亞當森大學|    1|
|       格魯斯特|    1|
| 國立臺南護理專科學校|   22|
|     香港浸會大學|    1|
|    love♏的♎|    1|
|         七七|    1|
|   Ladycaca|    1|
|       伊比爾喬|    1|
|      理論型情聖|    1|
|        OwO|    1|
|💸挖壕溝的女子嘟🔨|    1|
|         米米|    1|
|     142小女紙|    2|
|        紅心K|    1|
|       新鮮な肝|    1|
|       米蘭大學|    2|
|  倫敦大學伯貝克學院|    1|
+-----------+-----+
only showing top 20 rows



In [7]:
df.take(5)

[Row(_id=6150, anonymousDepartment=False, anonymousSchool=False, commentCount=19, content='好酷噢可以按讚XD', createdAt='2014-04-10T08:16:24.673Z', department='資訊工程學系', excerpt='好酷噢可以按讚XD', forumAlias='talk', forumId='255fd275-fec2-49d2-8e46-2e1557ffaeb0', forumName='閒聊', gender='M', hidden=False, hiddenByAuthor=False, likeCount=82, media=[], pinned=False, replyId=None, replyTitle='null', reportReason='', school='淡江大學', tags=[], title='新功能耶！', updatedAt='2014-04-10T08:16:24.673Z', withNickname=None),
 Row(_id=6151, anonymousDepartment=False, anonymousSchool=False, commentCount=60, content='大家快來給點建議吧 : )', createdAt='2014-04-10T09:15:11.945Z', department=' ', excerpt='大家快來給點建議吧 : )', forumAlias='talk', forumId='255fd275-fec2-49d2-8e46-2e1557ffaeb0', forumName='閒聊', gender='D', hidden=False, hiddenByAuthor=False, likeCount=18, media=[], pinned=False, replyId=None, replyTitle='null', reportReason='', school='狄卡', tags=[], title='新版建議', updatedAt='2014-04-10T09:15:11.945Z', withNickname=None),
 R

In [8]:
content = df.select('content')
print(content.rdd.getNumPartitions())
content_rdd = content.rdd.repartition(16).cache()


3


In [9]:
content_rdd.getNumPartitions()


16

In [10]:
lineLengths = content_rdd.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

81525


In [11]:
def remove_url_and_punctuation(sentence):
    # remove url
    if 'http' in sentence:
        sentence = re.sub(r'[a-zA-Z0-9_/:.]', '', sentence, flags=re.MULTILINE)

    # remove punctuation
    text_list = re.split('\W+', sentence)
    return list(filter(None, text_list))

def to_ngrams(unigrams, length):
    return Counter(zip(*[unigrams[i:] for i in range(length)]))

In [12]:
test = '🤔剛剛要回文時已經找不到了\n總之，內容就是跟姊姊一起刮卡然後中了205萬，於是發上來的照片我怎麼看都沒中😂。\n\n祝大家新年快樂，也希望大家在刮彩卷之類的東西能夠好好看清楚喔🙌，以免遇到以為中獎一家人很high，結果被網友說沒中的哭哭戲碼。\nhttps://i.imgur.com/Svbg4BF.jpg\n沒有截文章 截到圖片而已\n\n大家加油喔👊'

In [13]:
remove_url_and_punctuation(test)

['剛剛要回文時已經找不到了',
 '總之',
 '內容就是跟姊姊一起刮卡然後中了萬',
 '於是發上來的照片我怎麼看都沒中',
 '祝大家新年快樂',
 '也希望大家在刮彩卷之類的東西能夠好好看清楚喔',
 '以免遇到以為中獎一家人很',
 '結果被網友說沒中的哭哭戲碼',
 '沒有截文章',
 '截到圖片而已',
 '大家加油喔']

In [14]:
# result = content_rdd.map(lambda a: remove_url_and_punctuation(a['content'])).collect()

In [15]:
# result[:50]

In [16]:
# compute unigram and bigram count
import time

start = time.time()

unigram_counter = Counter()
bigram_counter = Counter()
trigram_counter = Counter()

result = content_rdd.map(lambda a: remove_url_and_punctuation(a['content'])).collect()

for post in result:
    for line in post:
        unigram_counter.update(line)
        bigram_counter.update(to_ngrams(line, 2))
        trigram_counter.update(to_ngrams(line, 3))

end = time.time()
print(end - start)

85.05258798599243


In [17]:
print(unigram_counter.most_common(10))
print(bigram_counter.most_common(10))
print(trigram_counter.most_common(10))


[('的', 555164), ('我', 355176), ('是', 336489), ('不', 258415), ('一', 244291), ('有', 243244), ('了', 175684), ('人', 147514), ('在', 140303), ('到', 127492)]
[(('沒', '有'), 44669), (('什', '麼'), 42759), (('可', '以'), 40630), (('一', '個'), 38825), (('自', '己'), 38158), (('知', '道'), 37187), (('大', '家'), 36745), (('覺', '得'), 35109), (('因', '為'), 34636), (('真', '的'), 33224)]
[(('不', '知', '道'), 18890), (('的', '時', '候'), 15379), (('有', '沒', '有'), 13914), (('為', '什', '麼'), 9675), (('自', '己', '的'), 8395), (('_', '_', '_'), 7672), (('哈', '哈', '哈'), 7248), (('我', '覺', '得'), 6490), (('是', '不', '是'), 6301), (('真', '的', '很'), 6168)]


In [18]:
def one_to_three_grams(line):
    print(line)
    result = (Counter(line), to_ngrams(line, 2), to_ngrams(line, 3))
    print(len(result[0]), len(result[1]), len(result[2]))
    return result
#     return (Counter(line), to_ngrams(line, 2), to_ngrams(line, 3))

In [19]:
print(content_rdd.top(1))
remove_url_and_punctuation(content_rdd.top(1)[0]['content'])

[Row(content='🤔🤔\n聽說今年在屏東某地的潮X高中\n全國繁星第一 (110人)\n但只有46個人上國立\n難道這就是所謂有學校就讀的概念嗎?\n\n還有據說繁星進大學的 都蠻優秀的\n是這樣嗎？')]


['聽說今年在屏東某地的潮X高中',
 '全國繁星第一',
 '110人',
 '但只有46個人上國立',
 '難道這就是所謂有學校就讀的概念嗎',
 '還有據說繁星進大學的',
 '都蠻優秀的',
 '是這樣嗎']

In [20]:
lineLengths = content_rdd.flatMap(lambda s: remove_url_and_punctuation(s['content'])).map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

15678028


In [21]:
# import time
# sec = int(round(time.time()))
# print(sec)
# sample_rdd = content_rdd.sample(False, 0.01, sec)

In [22]:
# result = content_rdd.flatMap(lambda s: remove_url_and_punctuation(s['content'])).map(lambda line: one_to_three_grams(line)).reduce(lambda a, b: tuple(map(operator.add, a, b)))

# result[0].most_common(10)

In [23]:
# result_one_grams = content_rdd.flatMap(lambda s: remove_url_and_punctuation(s['content'])).map(lambda s: Counter(s)).reduce(lambda a,b: a + b)

# result_one_grams.most_common(10)

In [24]:
# # Utilizing spark Accumulator to calculate n-grams.
# list_data = content_rdd.flatMap(lambda a: remove_url(a['content'])).collect()
# result_list = sc.parallelize(list_data)
# result_list.top(10)

# class CounterAccumulatorParam(AccumulatorParam):
#     def zero(self, initialValue):
#         return initialValue

#     def addInPlace(self, v1, v2):
#         v1 += v2
#         return v1

# # Then, create an Accumulator of this type:
# one_gram_accum = sc.accumulator(Counter(), CounterAccumulatorParam())
# two_gram_accum = sc.accumulator(Counter(), CounterAccumulatorParam())
# three_gram_accum = sc.accumulator(Counter(), CounterAccumulatorParam())

# def one_to_three_grams_accum(line):
#     one_gram_accum.add(Counter(line))
#     two_gram_accum.add(to_ngrams(line, 2))
#     three_gram_accum.add(to_ngrams(line, 3))

# result_list.foreach(lambda line: one_to_three_grams_accum(line))

# one_gram_accum.value.most_common(10)

# two_gram_accum.value.most_common(10)

# three_gram_accum.value.most_common(10)

 # Good-Turing Smoothing Language Model

In [25]:
V1 = content_rdd.flatMap(lambda s: remove_url_and_punctuation(s['content'])).map(lambda s: len(s)).reduce(lambda a, b: a + b)
V2 = V1 ** 2
k = 10

In [26]:
from math import log10
# compute N1, N2, N3...
unigram_Nr = Counter(unigram_counter.values())
bigram_Nr = Counter(bigram_counter.values())
# compute N0
unigram_Nr[0] = V1 - len(unigram_counter)
bigram_Nr[0] = V2 - len(bigram_counter)
print(unigram_Nr[0], bigram_Nr[0])

15670476 245800561178671


In [27]:
# compute r
unigram_r = [(i+1) * unigram_Nr[i+1] / unigram_Nr[i] for i in range(k)]
bigram_r = [(i+1) * bigram_Nr[i+1] / bigram_Nr[i] for i in range(k)]
print(unigram_r)
print(bigram_r)

[6.942992669782335e-05, 1.0183823529411764, 1.6570397111913358, 3.281045751633987, 3.545816733067729, 5.52808988764045, 5.378048780487805, 7.428571428571429, 8.461538461538462, 9.272727272727273]
[1.48258408464375e-09, 0.6455353712749026, 1.5699225491613036, 2.507757542280636, 3.497926924073594, 4.4898322035781755, 5.4324324324324325, 6.407449929837989, 7.322847813968305, 8.6522462562396]


In [28]:
# compute normalize factor
# compute N
unigram_N = sum(unigram_counter.values())
bigram_N = sum(bigram_counter.values())
print(unigram_N, bigram_N)

15678028 13901455


In [29]:
# compute new probability sum
unigram_N_ = unigram_N + k * unigram_Nr[k]
bigram_N_ = bigram_N + k * bigram_Nr[k]
print(unigram_N_, bigram_N_)

15679048 13989855


In [30]:
# normalize factor: N/N’
unigram_norm_factor = unigram_N / unigram_N_
bigram_norm_factor = bigram_N / bigram_N_
print(unigram_norm_factor, bigram_norm_factor)

0.9999349450298258 0.9936811353655917


In [31]:
# Estimating P(w) and P(w’|w)
def prob_1word(unigram):
    count = unigram_counter[unigram]
    r = unigram_r[count] if count < k else count
    return log10(r / unigram_N_)
def prob_2words(text_front, text_rear):
    count = bigram_counter[text_front, text_rear]
    r = bigram_r[count] if count < k else count
    return log10(r / bigram_N_)
def prob_word_by_word(text_front, text_rear):
    return prob_2words(text_front, text_rear) - prob_1word(text_front)
def prob_words(words):
    return prob_1word(words[0]) + sum(prob_word_by_word(words[i-1], words[i]) for i in range(1, len(words)))
def prob_text(text):
    return prob_words(text.lower().split())

In [32]:
print(prob_1word('清'))
print(prob_2words('清','華'))
print(prob_word_by_word('我','很'))

-3.352024606932168
-5.231999360819604
-1.8624478784734173


In [33]:
unicount_log = {k: log10(v) for k, v in unigram_counter.items()}
bicount_log = {k: log10(v) for k, v in bigram_counter.items()}
unigram_r_log = [log10(r) for r in unigram_r]
bigram_r_log = [log10(r) for r in bigram_r]
unigram_N_log = log10(unigram_N_)
bigram_N_log = log10(bigram_N_)


def prob_1word(unigram):
    count = unigram_counter[unigram]
    r = unigram_r_log[count] if count < k else unicount_log[unigram]
    return r - unigram_N_log
def prob_2words(text_front, text_rear):
    count = bigram_counter[text_front, text_rear]
    r = bigram_r_log[count] if count < k else bicount_log[text_front, text_rear]
    return r - bigram_N_log

In [34]:
print(prob_1word(u'清'))
print(prob_1word(u'華'))
print(prob_2words(u'清', u'華'))
print(prob_word_by_word(u'清',u'華'))

-3.3520246069321686
-3.7596345517270455
-5.231999360819604
-1.879974753887435


In [35]:
import math
# N_unigram_corpus = math.log2(float(sum(unigram_counter.values())))
# N_bigram_corpus = math.log2(float(sum(bigram_counter.values())))
def pmi(words):
    word1 = words[0]
    word2 = words[1]
    # Good-Turing Estimation 將次數小於k的字做一些調整 目標不要讓沒出現過的字 機率為0
    count_word1 = unigram_counter[word1]
    r_word1 = unigram_r_log[count_word1] if count_word1 < k else unicount_log[word1]
    count_word2 = unigram_counter[word2]
    r_word2 = unigram_r_log[count_word2] if count_word2 < k else unicount_log[word2]
    count_word1_and_word2 = bigram_counter[(word1, word2)]
    r_word1_and_word2 = bigram_r_log[count_word1_and_word2] if count_word1_and_word2 < k else bicount_log[(word1, word2)]

    # mutual information algorithm
    prob_word1 = r_word1 - unigram_N_log
    prob_word2 = r_word2 -  unigram_N_log
    prob_word1_word2 = r_word1_and_word2 -  bigram_N_log
    return prob_word1_word2 - (prob_word1+prob_word2)

In [36]:
print(pmi((u'聰', u'思')))
print(pmi((u'很',u'開')))
print(pmi((u'開', u'心')))
print(pmi((u'吃', u'飯')))
print(pmi((u'我', u'弟')))

-8.282093830811121
0.6955342288888389
1.5351483023990387
2.5625752923560103
0.632807350998914


In [37]:
import operator
threshold = 0.5

def word_segmentation(sentence):
    # input : 忘了當初選的選項  
    
    # Handle empty sentence.
    if len(sentence)==0:
        return sentence
    words_list = to_words(sentence, 2)
    max_probability_dict = find_max_prob(to_prob_dict(words_list))
    sentences = seperate_sentence(sentence, max_probability_dict)
    return sentences.split()

def to_words(unigrams, length):
    return list(zip(*[unigrams[i:] for i in range(length)]))


def to_prob_dict(words_list):
#   input : [('忘', '了'), ('了', '當'), ('當', '初'), ('初', '選'), ('選', '的'), ('的', '選'), ('選', '項')]
    result_sentence = {}
    for word in words_list:
        result_sentence[word] = pmi(word)
    return result_sentence


def find_max_prob(probability_dict):
    sorted_prob = sorted(probability_dict.items(), key=operator.itemgetter(1), reverse=True)
#     print(sorted_prob)  # Can see every candidate words prob.
    result_list = []
    for candidate in sorted_prob:
        prob = candidate[1]
        words_tuple = candidate[0]
        if prob > threshold:
            result_list.append(''.join(map(str, words_tuple)))  # ('選', '項') => 選項
    return result_list


def seperate_sentence(orginal_sentence, max_probability_dict):
    segment_word = orginal_sentence
    for candidate in max_probability_dict:
        insert_word = " "+candidate+" "
        segment_word = segment_word.replace(candidate, insert_word)
    return segment_word

In [38]:
test = word_segmentation(u'我已經向六個女生邀請她們當我舞伴')
print(test)

['我', '已經', '向', '六個', '女生', '邀請', '她們', '當我', '舞伴']


In [39]:
total_lines = content_rdd.flatMap(lambda s: remove_url_and_punctuation(s['content'])).collect()

In [40]:
for sentence in total_lines[:20]:
    print(sentence)
    print(word_segmentation(sentence))
    print()

呃
['呃']

忘了當初選的選項
['忘了', '當初', '選的', '選項']

想要知道的話可以顯示嗎
['想要', '知道', '的話', '可以', '顯示', '嗎']

我猜線在是不行
['我猜', '線在是', '不行']

要改了系統才能吧
['要改', '了', '系統', '才能', '吧']

有時候難得遇到認識的人其實很高興
['有', '時候', '難得', '遇到', '認識', '的人', '其實', '很', '高興']

不過因為太久沒看到
['不過', '因為', '太久', '沒', '看到']

不知道要說什麼
['不', '知道', '要說', '什麼']

所以就快步離開了
['所以', '就', '快步', '離開', '了']

大家都有這樣的經驗嗎
['大家', '都有', '這樣', '的', '經驗', '嗎']

感覺淡江的同學好多阿
['感覺', '淡江', '的', '同學', '好多', '阿']

抽到好多都是淡江的
['抽到', '好多', '都是', '淡江', '的']

好友也都是
['好友', '也', '都是']

學校即將舉行耶誕舞會
['學校', '即將', '舉行', '耶誕', '舞會']

我已經向六個女生邀請她們當我舞伴
['我', '已經', '向', '六個', '女生', '邀請', '她們', '當我', '舞伴']

但是都被拒絕
['但是', '都被', '拒絕']

我很難過
['我很', '難過']

我無法再承受任何打擊
['我', '無法', '再', '承受', '任何', '打擊']

我心理系同學對我說
['我', '心理', '系', '同學', '對', '我說']

不要難過
['不要', '難過']



In [41]:
def partial_match(word, counter):
    new_counter = Counter()
    for key, value in counter.most_common():
        if all(k1 == k2 or k2 is None for k1, k2 in zip(key, word)):
            new_counter[key] = value
    return new_counter


In [42]:
one_match = partial_match((u'沒', None), bigram_counter)
two_match = partial_match((u'沒', u'有', None), trigram_counter)

In [43]:
one_match.most_common(10)

[(('沒', '有'), 44669),
 (('沒', '什'), 2285),
 (('沒', '想'), 2201),
 (('沒', '辦'), 2084),
 (('沒', '人'), 1754),
 (('沒', '看'), 1645),
 (('沒', '關'), 1577),
 (('沒', '事'), 1472),
 (('沒', '錯'), 1344),
 (('沒', '多'), 864)]

In [44]:
two_match.most_common(10)

[(('沒', '有', '人'), 4608),
 (('沒', '有', '什'), 1272),
 (('沒', '有', '推'), 838),
 (('沒', '有', '這'), 832),
 (('沒', '有', '很'), 806),
 (('沒', '有', '要'), 713),
 (('沒', '有', '一'), 705),
 (('沒', '有', '想'), 627),
 (('沒', '有', '任'), 504),
 (('沒', '有', '看'), 495)]

# Latent Dirichlet allocation (LDA)

In [45]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

In [46]:
# Load and parse the data
data = sc.textFile("/usr/local/spark/data/mllib/sample_lda_data.txt")
print(data)
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
print(parsedData)
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
print(corpus)

/usr/local/spark/data/mllib/sample_lda_data.txt MapPartitionsRDD[37] at textFile at NativeMethodAccessorImpl.java:0
PythonRDD[38] at RDD at PythonRDD.scala:48
PythonRDD[40] at RDD at PythonRDD.scala:48


In [48]:
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

Learned topics (as distributions over vocab of 11 words):
Topic 0:
 4.61067775401
 3.25718008285
 3.61098650373
 21.1410339609
 1.33267641635
 3.69186142758
 14.762246776
 0.82137537405
 2.82532974243
 4.10976967321
 27.0634752056
Topic 1:
 15.2050887533
 6.01461422762
 1.98955510049
 16.5388192049
 18.7388467228
 12.455585142
 2.91831235673
 8.02529688477
 0.720473509988
 2.6590998242
 1.30631113799
Topic 2:
 6.1842334927
 19.7282056895
 6.39945839578
 2.32014683424
 4.92847686085
 5.85255343039
 13.3194408673
 1.15332774118
 4.45419674758
 17.2311305026
 4.63021365638


In [49]:
from collections import defaultdict


num_of_stop_words = 50      # Number of most common words to remove, trying to eliminate stop words
num_topics = 3              # Number of topics we are looking for
num_words_per_topic = 15    # Number of words to display for each topic
max_iterations = 60         # Max number of times to iterate before finishing

In [50]:
documents = sc.parallelize(result)
unigram_counter.most_common(50)
bigram_counter.most_common(50)

[(('沒', '有'), 44669),
 (('什', '麼'), 42759),
 (('可', '以'), 40630),
 (('一', '個'), 38825),
 (('自', '己'), 38158),
 (('知', '道'), 37187),
 (('大', '家'), 36745),
 (('覺', '得'), 35109),
 (('因', '為'), 34636),
 (('真', '的'), 33224),
 (('我', '們'), 31995),
 (('不', '是'), 28781),
 (('所', '以'), 28237),
 (('朋', '友'), 27289),
 (('就', '是'), 26864),
 (('時', '候'), 26351),
 (('還', '是'), 26081),
 (('這', '樣'), 24335),
 (('不', '知'), 21375),
 (('然', '後'), 21356),
 (('看', '到'), 21188),
 (('的', '人'), 20874),
 (('我', '的'), 20783),
 (('是', '我'), 20324),
 (('有', '人'), 19600),
 (('怎', '麼'), 19496),
 (('現', '在'), 18760),
 (('但', '是'), 18669),
 (('他', '們'), 18649),
 (('的', '時'), 18577),
 (('如', '果'), 17887),
 (('一', '下'), 17608),
 (('不', '會'), 16947),
 (('這', '個'), 16427),
 (('開', '始'), 16150),
 (('喜', '歡'), 15823),
 (('很', '多'), 15611),
 (('哈', '哈'), 15334),
 (('一', '直'), 15143),
 (('都', '是'), 14362),
 (('一', '樣'), 14308),
 (('個', '人'), 14118),
 (('有', '沒'), 14033),
 (('不', '要'), 13770),
 (('比', '較'), 13766),
 (('有', '一

In [51]:
def document_segmentation(document):
    sentences = remove_url_and_punctuation(document)  # output ['呃', '忘了當初選的選項', '想要知道的話可以顯示嗎', '我猜線在是不行', '要改了系統才能吧']
    
    sentence_list = []
    for sentence in sentences:
        # Handle empty sentence.
        if len(sentence)==0:
            return sentence
        words_list = to_words(sentence, 2)
        max_probability_dict = find_max_prob(to_prob_dict(words_list))
        sentences = seperate_sentence(sentence, max_probability_dict)
        sentence_list = sentence_list + sentences.split()
    return sentence_list


# Convert the given document into a vector of word counts
def document_vector(document):
    id = document[1]
    counts = Counter()
    for token in document[0]:
        if token in vocabulary:
            token_id = vocabulary[token]
            counts[token_id] += 1
    counts = sorted(counts.items())
    if len(counts) > 0 :
        keys = [x[0] for x in counts]
        values = [x[1] for x in counts]
    else:
        keys = [0]
        values = [0]
    return (id, Vectors.sparse(len(vocabulary), keys, values))


In [52]:
content = df.select(df['_id'], df['content'])
content_rdd = content.rdd.repartition(16).cache()
# 把原本dcard的 id與標題 做比對


content_rdd.take(1)[0]
# result_rdd.map(lambda sentence: word_segmentation(sentence)).collect()

Row(_id=10003, content='呃....忘了當初選的選項，想要知道的話可以顯示嗎?\r\n我猜線在是不行...要改了系統才能吧?\r\n')

In [53]:
result_rdd = content_rdd.map(lambda posts: document_segmentation(posts['content'])).filter(lambda documnet: len(documnet)>0)



In [54]:
result_rdd.take(1)

[['呃',
  '忘了',
  '當初',
  '選的',
  '選項',
  '想要',
  '知道',
  '的話',
  '可以',
  '顯示',
  '嗎',
  '我猜',
  '線在是',
  '不行',
  '要改',
  '了',
  '系統',
  '才能',
  '吧']]

In [55]:
sentence_counts = result_rdd.flatMap(lambda document: document) \
    .map(lambda word: (word, 1) if word is not list else ('', 0)) \
    .reduceByKey( lambda x,y: x + y) \
    .map(lambda tuple: (tuple[1], tuple[0])) \
    .sortByKey(False)

In [56]:
# Identify a threshold to remove the top words, in an effort to remove stop words
threshold_value = sentence_counts.take(num_of_stop_words)[num_of_stop_words - 1][0]

# Only keep words with a count less than the threshold identified above, and then index each one and collect them into a map
vocabulary = sentence_counts                    \
    .filter(lambda x : x[0] < threshold_value)  \
    .map(lambda x: x[1])                        \
    .zipWithIndex()                             \
    .collectAsMap()

In [57]:

documents = result_rdd.zipWithIndex().map(document_vector).map(list)


In [58]:
inv_voc = {value: key for (key, value) in vocabulary.items()}


In [59]:
documents.take(10)

[[0,
  SparseVector(370239, {33: 1.0, 92: 1.0, 100: 1.0, 532: 1.0, 559: 1.0, 723: 1.0, 892: 1.0, 1043: 1.0, 1110: 1.0, 1635: 1.0, 2763: 1.0, 4281: 1.0, 8909: 1.0, 12721: 1.0, 196820: 1.0})],
 [1,
  SparseVector(370239, {7: 1.0, 22: 1.0, 56: 1.0, 127: 1.0, 144: 1.0, 192: 1.0, 209: 1.0, 480: 1.0, 2106: 1.0, 2195: 1.0, 2411: 1.0, 2904: 1.0, 18202: 1.0})],
 [2,
  SparseVector(370239, {14: 2.0, 31: 1.0, 35: 1.0, 107: 1.0, 506: 1.0, 687: 2.0, 853: 1.0, 3924: 2.0})],
 [3,
  SparseVector(370239, {27: 1.0, 30: 1.0, 34: 1.0, 35: 1.0, 67: 1.0, 77: 1.0, 127: 1.0, 152: 1.0, 180: 1.0, 189: 1.0, 378: 1.0, 387: 2.0, 503: 2.0, 505: 2.0, 583: 1.0, 663: 2.0, 683: 1.0, 903: 1.0, 953: 1.0, 1052: 1.0, 1065: 1.0, 1241: 1.0, 1400: 1.0, 1745: 1.0, 1851: 2.0, 1877: 1.0, 2142: 1.0, 2498: 1.0, 2565: 1.0, 2896: 1.0, 3404: 1.0, 3929: 1.0, 5423: 1.0, 5460: 1.0, 8729: 1.0, 9386: 1.0, 9487: 1.0, 10895: 2.0, 12669: 1.0, 15978: 1.0, 23234: 1.0, 80409: 1.0, 140181: 1.0})],
 [4,
  SparseVector(370239, {1: 1.0, 12: 1.0, 13

In [60]:
for a in vocabulary.items():
    if a[1]==194994:
        print(a)

('傘伸', 194994)


In [61]:
lda_model = LDA.train(documents, k=num_topics, maxIterations=max_iterations)
topic_indices = lda_model.describeTopics(maxTermsPerTopic=num_words_per_topic)

In [62]:
# with open("/home/phejimlin/Documents/dcard_spark/output.txt", 'w') as f:
#     lda_model = LDA.train(documents, k=num_topics)

#     topic_indices = lda_model.describeTopics(maxTermsPerTopic=num_words_per_topic)
        
#     # Print topics, showing the top-weighted 10 terms for each topic
#     for i in range(len(topic_indices)):
#         f.write("Topic #{0}\n".format(i + 1))
#         for j in range(len(topic_indices[i][0])):
#             f.write("{0}\t{1}\n".format(inv_voc[topic_indices[i][0][j]].encode('utf-8'), topic_indices[i][1][j]))
            

#     f.write("{0} topics distributed over {1} documents and {2} unique words\n".format(topic_val, documents.count(), len(vocabulary)))

In [63]:
for i in range(len(topic_indices)):
    print("Topic #{0}\n".format(i + 1))
    for j in range(len(topic_indices[i][0])):
        print("{0}\t{1}\n".format(inv_voc[topic_indices[i][0][j]], topic_indices[i][1][j]))

Topic #1

想	0.005876551881665978

各位	0.005145913310986505

請問	0.004985720155710598

同學	0.004644286372869073

時間	0.004195192973717878

到	0.004185557387906027

學校	0.0040965667408815265

有人	0.004007460206057752

最近	0.003878683799553402

大學	0.0037732649728026424

去	0.003542083858920946

一	0.003325086030674636

比較	0.0031826984276454996

00	0.003181115845806004

沒	0.0031727424346081963

Topic #2

一直	0.004773470599061366

哈哈	0.0043644766875047284

她	0.004244470627124026

今天	0.004020599861706831

女生	0.003962322751623182

跟我	0.003923133256579232

看	0.003878029570871312

起來	0.0035755270931731673

沒	0.0033865970241657496

開始	0.0031734655579402833

可是	0.0031669840614835343

發現	0.003136650194175333

小	0.003106826557661898

喜歡	0.0030949453573358237

突然	0.0030701189968184205

Topic #3

這個	0.004203443217561273

台灣	0.004136925176251302

喜歡	0.0034704957854613558

為	0.003355249158990656

很多	0.003299620594330903

和	0.002975157292996413

只是	0.0028927269604115584

而	0.002813542039597897

上	0.002684373420390

In [64]:
for i in range(len(topic_indices)):
    print("Topic #{0}\n".format(i + 1))
    for j in range(len(topic_indices[i][0])):
        print("{0}\t{1}\n".format(inv_voc[topic_indices[i][0][j]], topic_indices[i][1][j]))


Topic #1

想	0.005876551881665978

各位	0.005145913310986505

請問	0.004985720155710598

同學	0.004644286372869073

時間	0.004195192973717878

到	0.004185557387906027

學校	0.0040965667408815265

有人	0.004007460206057752

最近	0.003878683799553402

大學	0.0037732649728026424

去	0.003542083858920946

一	0.003325086030674636

比較	0.0031826984276454996

00	0.003181115845806004

沒	0.0031727424346081963

Topic #2

一直	0.004773470599061366

哈哈	0.0043644766875047284

她	0.004244470627124026

今天	0.004020599861706831

女生	0.003962322751623182

跟我	0.003923133256579232

看	0.003878029570871312

起來	0.0035755270931731673

沒	0.0033865970241657496

開始	0.0031734655579402833

可是	0.0031669840614835343

發現	0.003136650194175333

小	0.003106826557661898

喜歡	0.0030949453573358237

突然	0.0030701189968184205

Topic #3

這個	0.004203443217561273

台灣	0.004136925176251302

喜歡	0.0034704957854613558

為	0.003355249158990656

很多	0.003299620594330903

和	0.002975157292996413

只是	0.0028927269604115584

而	0.002813542039597897

上	0.002684373420390

In [65]:
# ldaModel.save(sc, '/home/phejimlin/Documents/dcard_spark')

In [66]:
topics = ldaModel.topicsMatrix()


In [67]:
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

Topic 0:
 4.61067775401
 3.25718008285
 3.61098650373
 21.1410339609
 1.33267641635
 3.69186142758
 14.762246776
 0.82137537405
 2.82532974243
 4.10976967321
 27.0634752056
Topic 1:
 15.2050887533
 6.01461422762
 1.98955510049
 16.5388192049
 18.7388467228
 12.455585142
 2.91831235673
 8.02529688477
 0.720473509988
 2.6590998242
 1.30631113799
Topic 2:
 6.1842334927
 19.7282056895
 6.39945839578
 2.32014683424
 4.92847686085
 5.85255343039
 13.3194408673
 1.15332774118
 4.45419674758
 17.2311305026
 4.63021365638


In [68]:
type(documents)

pyspark.rdd.PipelinedRDD

In [69]:
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import Vectors, SparseVector

# Convert the given document into a vector of word counts
def document_vector(document):
    id = document[1]
    counts = Counter()
    for token in document[0]:
        if token in vocabulary:
            token_id = vocabulary[token]
            counts[token_id] += 1
    counts = sorted(counts.items())
    if len(counts) > 0 :
        keys = [x[0] for x in counts]
        values = [x[1] for x in counts]
    else:
        keys = [0]
        values = [0]
    return (id, SparseVector(len(vocabulary), keys, values))

In [70]:
# df_lda = my_spark.createDataFrame([[1, Vectors.dense([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})]], ["id", "features"])
documents = result_rdd.zipWithIndex().map(document_vector).map(list).collect()
df_lda = my_spark.createDataFrame(documents, ["id", "features"])


In [71]:
lda = LDA(k=5, seed=1, optimizer="em")
model = lda.fit(df_lda)
model.isDistributed()

localModel = model.toLocal()
localModel.isDistributed()

model.vocabSize()

model.describeTopics().show()

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[2, 173, 9, 3, 1,...|[0.00266596499784...|
|    1|[54, 1, 3, 49, 2,...|[0.00267392267619...|
|    2|[0, 1, 6, 8, 4, 5...|[0.00257590089825...|
|    3|[7, 9, 4, 2, 8, 5...|[0.00269081560297...|
|    4|[214, 237, 5, 0, ...|[0.00304493206973...|
+-----+--------------------+--------------------+



In [72]:
model.topicsMatrix()

DenseMatrix(370239, 5, [3084.9592, 3378.9098, 3696.3168, 3386.0323, 3306.3288, 3350.5957, 3282.4246, 3201.592, ..., 0.3116, 0.0827, 0.1136, 0.1187, 0.0629, 0.0222, 0.1752, 0.1648], 0)

In [88]:
lda_result = model.describeTopics().collect()

In [89]:
lda_result[0][2]

[0.0026659649978443977,
 0.0026524872738849176,
 0.002463622275976601,
 0.002442172659615011,
 0.0024370355544656276,
 0.002416614020059773,
 0.0023846865782090627,
 0.0023674456465796376,
 0.002309145195733088,
 0.002225023907389491]

In [90]:
for i in range(len(lda_result)):
    print("Topic #{0}\n".format(i + 1))
    for j in range(len(lda_result[i][1])):
        print("{0}\t{1}\n".format(inv_voc[lda_result[i][1][j]], lda_result[i][2][j]))

Topic #1

開始	0.0026659649978443977

__	0.0026524872738849176

一直	0.002463622275976601

到	0.002442172659615011

這個	0.0024370355544656276

喜歡	0.002416614020059773

不會	0.0023846865782090627

很多	0.0023674456465796376

沒	0.002309145195733088

想	0.002225023907389491

Topic #2

台灣	0.0026739226761905367

這個	0.0025113442187752483

到	0.0024832180719042574

大學	0.0023908255371749235

開始	0.0023432689956699097

上	0.0023200172517017364

很多	0.002288571461157144

不會	0.002224652553767057

問題	0.0021949973319885826

一	0.002180700881704841

Topic #3

想	0.002575900898259511

這個	0.002567104616922189

很多	0.0024258031993770537

看	0.0024057795919906993

不會	0.002343615087396145

喜歡	0.002317761425816745

上	0.0022841946557724917

到	0.002267377234249656

有人	0.002255848636167972

沒	0.0022285380082355663

Topic #4

沒	0.0026908156029775655

一直	0.002525349381928592

不會	0.002520496454430399

開始	0.0025155167971237635

看	0.002459928509714513

喜歡	0.002446171795137844

到	0.0024421857845255664

想	0.00243726668711843

今天	0.00

In [102]:
test_document = my_spark.createDataFrame(documents[:10], ["id", "features"])
transformed = model.transform(test_document)
dist = transformed.take(10)

In [103]:
transformed.show()

+---+--------------------+--------------------+
| id|            features|   topicDistribution|
+---+--------------------+--------------------+
|  0|(370239,[33,92,10...|[0.20108538494045...|
|  1|(370239,[7,22,56,...|[0.20231190914788...|
|  2|(370239,[14,31,35...|[0.19972245203753...|
|  3|(370239,[27,30,34...|[0.23060794160369...|
|  4|(370239,[1,12,13,...|[0.06363641411485...|
|  5|(370239,[3,7,8,14...|[0.18531674868468...|
|  6|(370239,[1,10,14,...|[0.18808658392380...|
|  7|(370239,[14,124,1...|[0.18325187414877...|
|  8|(370239,[7,28,34,...|[0.15195312922567...|
|  9|(370239,[8,45,46,...|[0.19948906753667...|
+---+--------------------+--------------------+



In [116]:
dist

[Row(id=0, features=SparseVector(370239, {33: 1.0, 92: 1.0, 100: 1.0, 532: 1.0, 559: 1.0, 723: 1.0, 892: 1.0, 1043: 1.0, 1110: 1.0, 1635: 1.0, 2763: 1.0, 4281: 1.0, 8909: 1.0, 12721: 1.0, 196820: 1.0}), topicDistribution=DenseVector([0.2011, 0.1956, 0.205, 0.1976, 0.2007])),
 Row(id=1, features=SparseVector(370239, {7: 1.0, 22: 1.0, 56: 1.0, 127: 1.0, 144: 1.0, 192: 1.0, 209: 1.0, 480: 1.0, 2106: 1.0, 2195: 1.0, 2411: 1.0, 2904: 1.0, 18202: 1.0}), topicDistribution=DenseVector([0.2023, 0.1937, 0.1978, 0.2113, 0.1949])),
 Row(id=2, features=SparseVector(370239, {14: 2.0, 31: 1.0, 35: 1.0, 107: 1.0, 506: 1.0, 687: 2.0, 853: 1.0, 3924: 2.0}), topicDistribution=DenseVector([0.1997, 0.1972, 0.2013, 0.1992, 0.2026])),
 Row(id=3, features=SparseVector(370239, {27: 1.0, 30: 1.0, 34: 1.0, 35: 1.0, 67: 1.0, 77: 1.0, 127: 1.0, 152: 1.0, 180: 1.0, 189: 1.0, 378: 1.0, 387: 2.0, 503: 2.0, 505: 2.0, 583: 1.0, 663: 2.0, 683: 1.0, 903: 1.0, 953: 1.0, 1052: 1.0, 1065: 1.0, 1241: 1.0, 1400: 1.0, 1745: 1.