In [1]:
import findspark
findspark.init('/home/hadoop/spark-2.2.2-bin-hadoop2.7')
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("miniProject").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
sqlcontext = SQLContext(sc)

In [2]:
class MaxScoreTokenizer:
    """Soynlp Maxscore Tokenizer: origin https://github.com/lovit/soynlp"""
    def __init__(self, scores=None, max_length=10, default_score=0.0):
        self._scores = scores if scores else {}
        self._max_length = max_length
        self._ds = default_score

    def __call__(self, sentence, flatten=True):
        return self.tokenize(sentence, flatten)

    def tokenize(self, sentence, flatten=True):
        tokens = [self._recursive_tokenize(token) for token in sentence.split()]
        if flatten:
            tokens = [subtoken[0] for token in tokens for subtoken in token]
        return tokens

    def _recursive_tokenize(self, token, range_l=0, debug=False):
        
        length = len(token)
        if length <= 2:
            return [(token, 0, length, self._ds, length)]

        if range_l == 0:
            range_l = min(self._max_length, length)

        scores = self._initialize(token, range_l, length)
        if debug:
            pprint(scores)
        
        result = self._find(scores)
        
        adds = self._add_inter_subtokens(token, result)
        
        if result[-1][2] != length:
            adds += self._add_last_subtoken(token, result)
            
        if result[0][1] != 0:
            adds += self._add_first_subtoken(token, result)
            
        return sorted(result + adds, key=lambda x:x[1])

    def _initialize(self, token, range_l, length):
        scores = []
        for b in range(0, length - 1):
            for r in range(2, range_l + 1):
                e = b + r
                
                if e > length: 
                    continue
                
                subtoken = token[b:e]
                score = self._scores.get(subtoken, self._ds)
                scores.append((subtoken, b, e, score, r))
                
        return sorted(scores, key=lambda x:(-x[3], -x[4], x[1]))

    def _find(self, scores):
        result = []
        num_iter = 0
        
        while scores:
            word, b, e, score, r = scores.pop(0)
            result.append((word, b, e, score, r))

            if not scores:
                break

            removals = []
            for i, (_1, b_, e_, _2, _3) in enumerate(scores):
                if (b_ < e and b < e_) or (b_ < e and e_ > b):
                    removals.append(i)

            for i in reversed(removals):
                del scores[i]

            num_iter += 1
            if num_iter > 100: break

        return sorted(result, key=lambda x:x[1])
    
    def _add_inter_subtokens(self, token, result):
        adds = []        
        for i, base in enumerate(result[:-1]):
            if base[2] == result[i+1][1]:
                continue
            
            b = base[2]
            e = result[i+1][1]
            subtoken = token[b:e]
            adds.append((subtoken, b, e, self._ds, e - b))
        
        return adds

    def _add_first_subtoken(self, token, result):
        e = result[0][1]
        subtoken = token[0:e]
        score = self._scores.get(subtoken, self._ds)
        return [(subtoken, 0, e, score, e)]
    
    def _add_last_subtoken(self, token, result):
        b = result[-1][2]
        subtoken = token[b:]
        score = self._scores.get(subtoken, self._ds)
        return [(subtoken, b, len(token), score, len(subtoken))]

from hdfs import InsecureClient
import json
client_hdfs = InsecureClient('http://master:50070')

In [3]:
with client_hdfs.read('/user/hadoop/scores.json', encoding = 'utf-8') as score_json:
    scores = json.load(score_json)
tokenizer = MaxScoreTokenizer(scores=scores)

In [4]:
scores

{'마피아': 0.20434891103829422,
 '별점테러하지': 0.4235297767094588,
 '싸가지없': 0.34919048178905,
 '이작품이': 0.17811735527464292,
 '작가님이': 2.4564077346254667,
 '고무고무': 0.27195361489510117,
 '딩동이': 1.3670551036019707,
 '킹메': 0.1266130832232532,
 '있다니': 0.31776686243787294,
 '막컷의': 0.19984585807347613,
 '무섭지': 0.18178197883655792,
 '믿는다': 0.6722550930486331,
 '우디르급': 0.1,
 '왼손잡이': 0.1,
 '보여주고싶': 0.1,
 '없나요': 0.3855053128390623,
 '주인한테': 0.3144527019383392,
 '출근하는': 0.4110589446144157,
 '아테나의': 0.23937545484932043,
 '살아': 0.8583299461162326,
 '댕댕': 0.1,
 '어이없어서': 0.4590750865054275,
 '위주로': 0.31877602398948546,
 '죽였는': 0.1,
 '맴도': 0.1,
 '걸렸습': 0.1,
 '마조히스': 0.1,
 '에너지를': 0.7756596902764186,
 '싫어하시는': 0.18409901877645407,
 '동의하지': 0.15724950374759297,
 '사랑해주세': 0.1,
 '모르지': 0.23763103341663508,
 '어쩌구저쩌구': 0.3305037967005815,
 '버튼을': 0.22489245256502194,
 '솔직하고': 0.36685630302743955,
 '따라했': 0.1,
 '익숙해졌': 0.1,
 '칼리': 0.3465714457958713,
 '살아왔': 0.21618443937034837,
 '마음의소리에': 0.1,
 '다니는': 0.565365143128

In [5]:
dataset=sqlcontext.read.parquet('hdfs://master:9000/user/hadoop/all_hangul_spark.parquet')
dataset.schema

StructType(List(StructField(title_id,IntegerType,true),StructField(episode_num,IntegerType,true),StructField(comment_num,IntegerType,true),StructField(sentence,StringType,true),StructField(registered_time,TimestampType,true)))

In [9]:
from pyspark.sql.functions import udf
tokenizer_udf=udf(tokenizer)
token_df=dataset.withColumn('tokens', tokenizer_udf(dataset.sentence)).select('title_id', 'episode_num', 'comment_num', 'tokens', 'registered_time')

In [11]:
token_df.write.parquet('hdfs://master:9000/user/hadoop/all_hangul_tokens')


+--------+-----------+-----------+--------------------+-------------------+
|title_id|episode_num|comment_num|              tokens|    registered_time|
+--------+-----------+-----------+--------------------+-------------------+
|  112931|          0|          0|[정주행, 하시는, 분들, 추천...|2013-05-22 01:07:04|
|  112931|          0|          1|[만, 렙이, 레이, 드, 못,...|2013-04-15 01:47:32|
|  112931|          0|          2|[아랑소드, 용사, 시절, 에피...|2013-05-02 08:37:28|
|  112931|          0|          3|[본격, 드래곤, 이, 자기, ...|2013-04-19 06:56:25|
|  112931|          0|          4|[본격, 세계, 를구한, 드래곤...|2013-04-26 07:52:28|
+--------+-----------+-----------+--------------------+-------------------+
only showing top 5 rows

