In [0]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window
import sys
import os

In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
rank = spark.read.parquet('dbfs:/mnt/lsde/group24/subreddit_rank_2')
sub_id_list = rank.select(col('subreddit_id')).orderBy(asc('subreddit_id')).rdd.map(lambda r: r.subreddit_id).collect()
print(len(sub_id_list))

1000


In [0]:
all_com_files_list = os.listdir('/dbfs/mnt/lsde/group24/comments')
file_path = 'dbfs:/mnt/lsde/group24/comments/'

In [0]:
#only select the comments for each user which score is in top 500
def select_top_expert():
    schema_comments = 'subreddit string, subreddit_id string, author string, score long, body string'
    result = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema_comments)
    for com_file in all_com_files_list:
        com = spark.read.parquet(file_path + com_file) \
                        .filter(col('subreddit_id').isin(sub_id_list))
        #mean = com.select(col('score')).agg(avg('score')).collect()[0][0]
        #com = com.filter(col('score') > mean)
        result = result.union(com)
    print('union finished')
    window1 = Window.partitionBy('subreddit_id','author').orderBy(desc('score'))
    result = result.withColumn('rank_per_comment', row_number().over(window1))
    result = result.filter(col('rank_per_comment')<=500)
    result = result.groupBy('subreddit', 'subreddit_id', 'author') \
                    .agg(sum('score').alias('expert_score'), collect_list('body').alias('expert_body'))
    print('group finished')
    window2 = Window.partitionBy('subreddit_id').orderBy(desc('expert_score'))
    result = result.withColumn('expert_score_rank', row_number().over(window2))
    print('partition fininshed')
    result = result.filter(col('expert_score_rank')<=20)
    print('Done')
    return result
top_expert = select_top_expert()

union finished
group finished
partition fininshed
Done


In [0]:
write_path = 'dbfs:/mnt/lsde/group24/'
top_expert.write.format("parquet").mode("overwrite").save(write_path + "/top_expert")

In [0]:
from gensim.summarization import summarize
import emoji
import re
import RAKE
import operator
import jieba.analyse
import collections

In [0]:
###
top_experts = spark.read.parquet('dbfs:/mnt/lsde/group24/top_expert') \
                        .orderBy('subreddit_id') \
                        #.filter(col('subreddit') == 'China_irl') \
# top_experts = top_expert.orderBy('subreddit_id')

In [0]:
top_experts_list= top_experts.rdd.collect()

In [0]:
def rm_emoji(sentence):
    #rm_emoji = re.compile(u'[\U00010000-\U0010ffff]')
    #rm_emoji.sub('', sentence)
    sentence = emoji.demojize(sentence)
    sentence = re.sub(':\S+?:', ' ', sentence)
    return sentence
  
def rm_url(sentence):
    s = sentence.split(' ')
    # remove http://...
    url_pattern = re.compile(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%|\-)*\b', re.S)
    # romove without http:// ....
    domain_pattern = re.compile(r'(\b)*(.*?)\.(com|cn)')
    if len(s) > 0:
        result = []
        for item in s:
            s = re.sub(url_pattern, '', item)
            s = re.sub(domain_pattern,'', s)
            result.append(s)
        return ' '.join(result)
    else:
        return re.sub(url_pattern, '', s)
    
def rm_html_tag(sentence):
    html_pattern = re.compile('(<a|<b)(.*?)(</a>|</b>)', re.S)
    sentence=re.sub(html_pattern,'', sentence)
    return sentence

def rm_tag(sentence):
    tag_pattern = re.compile('(\[|\#|【)(.*?)(\#|\]|\】)', re.S)
    sentence = re.sub(tag_pattern, '', sentence)
    return sentence.strip()

def rm_at(sentence):
    at_pattern = re.compile('@\S*', re.S)
    sentence = re.sub(at_pattern, '', sentence)
    return sentence.strip()

def rm_other(sentence):
    sentence.replace('\n', '')
    sentence.replace('\r', '')
    sentence.replace('#', '')
    sentence.replace('*', '')
    return sentence.strip()

def clean_sentence(s):
    s = rm_emoji(s)
    s = rm_url(s)
    s = rm_html_tag(s)
    s = rm_tag(s)
    s = rm_at(s)
    s = rm_other(s)
    s = re.sub(r'。|？|！', '. ', s)
    #print(s[-1])
    if len(s)>0 and s[-1] not in ['.', '?','!']:
        s = s+ '.'
    return s

def clean_text(text):
    new_text_str = ''
    for sentence in text:
        s = clean_sentence(sentence)
        new_text_str = new_text_str + ' ' + s
    return new_text_str

In [0]:
def summarize_text(text):
    res = re.split('[.!?]',text)
    if len(res) <= 10:
        return text
    abstract = summarize(text)
    return abstract

In [0]:
stop_dir = '/dbfs/FileStore/Group24/SmartStoplist.txt'
stop_dir_cn = '/dbfs/FileStore/Group24/cn_stopwords.txt'
rake_object = RAKE.Rake(stop_dir)
jieba.analyse.set_stop_words(stop_dir_cn)
def sort_tuple(tup):
    tup.sort(key = lambda x: x[1])
    return tup
def run_rake(text):
    #print('raking')
    keywords = sort_tuple(rake_object.run(text))[-10: ]
    return keywords
def run_rake_cn(text):
    words = jieba.analyse.textrank(text, topK=20,withWeight=True)
    key_words = sort_tuple(words)[-10:]
    return key_words

In [0]:
def get_expertise_cn(text):
    new_text = clean_text(text)
    abstract = summarize_text(new_text)
    expertise = run_rake_cn(abstract)
    if len(expertise) == 0:
        expertise = [(text[0], 10.0)]
        return expertise
    return expertise

In [0]:
def get_expertise(text):
    if text[0] == '9/10':
        text = clean_text(text)
        return run_rake(text)
    new_text = clean_text(text)
    abstract = summarize_text(new_text)
    expertise = run_rake(abstract)
    #expertise = run_rake(new_text)
    if len(expertise) == 0:
        expertise = [(text[0], 10.0)]
        return expertise
    return expertise

In [0]:
def extract_expertise(expert):
    #chinese deal with specially
    if expert[1] == 't5_x72uq':
        expertise = get_expertise_cn(expert[4])
    else:
        expertise = get_expertise(expert[4])
    tup = (expert[0], expert[1], expert[2], expert[3], expertise, expert[5])
    return tup

start = 0
#end = 20000
part_experts_list = top_experts_list[start:]
expertises_list = []
# print(top_experts_list[:10])

rdd_experts = sc.parallelize(part_experts_list)
expertises_list = rdd_experts.map(extract_expertise).collect()
print(expertises_list)
# display(df)
#def extract_expertise():
#    for expert in part_experts_list:
#            expertise = get_expertise(expert[4])
#            tup = (expert[0], expert[1], expert[2], expert[3], expertise, expert[5])
#            expertises_list.append(tup)
#            print(expert[0] + ' finished') 
#extract_expertise()

[('China_irl', 't5_x72uq', 'ErwinRRR', 21265, [('共匪', 0.2919269235279303), ('问题', 0.30085721651710706), ('觉得', 0.3348083232951814), ('东西', 0.33883345897310707), ('结果', 0.34826630776203793), ('时候', 0.38251735338798337), ('可能', 0.39758783402955705), ('国家', 0.4132706429507359), ('没有', 0.5096784844022569), ('中国', 1.0)], 1), ('China_irl', 't5_x72uq', 'Adventure_Alone', 21034, [('轰炸', 0.3297834212538825), ('文革', 0.35097347055301276), ('政治', 0.36308266858879634), ('中國', 0.37479726316605194), ('政策', 0.4239069780908441), ('没有', 0.4731043541154743), ('时候', 0.5342771533699927), ('知道', 0.5444098056480373), ('民主', 0.5775653690618686), ('中国', 1.0)], 2), ('China_irl', 't5_x72uq', 'KailBroflovsky', 20772, [('思想', 0.553656316856696), ('还有', 0.5562829093682939), ('知道', 0.5569364348172173), ('觉得', 0.559430321379635), ('舆论', 0.5926594429397128), ('文革', 0.635289093164218), ('中国', 0.7655327314333069), ('平子', 0.7948924472567184), ('资本家', 0.8198218706030398), ('没有', 1.0)], 3), ('China_irl', 't5_x72uq', 'Formu

In [0]:
print(len(expertises_list))

20


In [0]:
#expertises_list = expertises_list[0:300]
sc = SparkContext.getOrCreate()
expertises_df = sc.parallelize(expertises_list).toDF()

In [0]:
expertises_df = expertises_df.select(col('_6').alias('rank'),col('_1').alias('subreddit')
                                    , col('_2').alias('subreddit_id'), col('_3').alias('author')
                                    , col('_4').alias('expert_score'), col('_5').alias('expertise'))

In [0]:
expertises_df.coalesce(1).write.partitionBy('subreddit') \
                .mode('append') \
                .option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false") \
                .json('dbfs:/FileStore/Group24/expertise_json')

In [0]:
json_file_list = os.listdir('/dbfs/FileStore/Group24/expertise_json')
print(len(json_file_list))

1001
