# Tokenization ( or Segmentation )


## Build Library

interior boundary scoring에는 앞서 설명한 Cohesion Score 이외에 다양한 공식을 적용시킬 수 있다. 글자와 글자 사이에 관계를 정량화 할 수 있는 방법이 있다면 이를 적용시킬 수 있다. 

여기에서는 이상의 내용을 liabrary로 만들어 보고 예시 데이터 전체를 Segmentation 해 보자. 아울러 앞서 설명한 Cohesion Score 이외에 t-score와 simple-log-likelihood-ratio의 값을 추가로 적용시켜보자. 

* Cohesion Score
* Cohesion Greedy Score
* t-score
* simple-log-likelihood-ratio

In [1]:
import math
from time import time
from collections import Counter
from tqdm import tqdm_notebook
from functools import reduce

In [2]:
def n_gram( text, n=2 ):
    size = len( text )
    grams = [ text[i:i+n] for i in range(size -n+1 ) ]
    return grams

def get_allgrams( line, counter=Counter(), n_min=2, n_max=8):
    """
    n_min=2, n_max=8 : # 1-gram, 2-gram, 3-gram .... 8-gram
    corpus = [doc1, doc2, ... docn]
    """
    ns = range( n_min, n_max+1 ) 
    for n in ns:
        counter.update(  n_gram( line, n ) )
    return counter

def expected_value( term, unigrams, allgrams, corpora_size ):
    size = len( term )
    chrs = list( term )
    o = allgrams.get( term ) if allgrams.get( term ) else 0
    observed_chrs = [ unigrams.get( c ) if unigrams.get( c )  else 0 for c in chrs ]
    mul = reduce(lambda x, y: x*y, observed_chrs )
    e = mul / math.pow( corpora_size, size - 1 )
    return ( o, e )

def t_score( o, e ):
    return ( o - e ) / math.sqrt( o + 1 )

def sim_ll( o, e ):
    if e == o or o == 0 : return 0
    rst = 2 * ( o * math.log( o / e ) - ( o - e ) )
    if o >= e : return rst
    else : return -1 * rst

In [6]:
class CohesionTokenizer():
    
    def __init__( self, alpha=0 ):
        self.train_allgrams = Counter()
        self.train_unigrams = Counter()
        self.mark_in = " _¶{:d}_ "
        self.mark_out = "_¶{:d}_"
        self.alpha = alpha
        self.corpora_size = 0
    
    def fit( self, corpus, min_tf=5 ):
        
        q_ = time()
        
        for line in tqdm_notebook( corpus ): 
            self.train_unigrams.update( list( line ) )
            get_allgrams( line, self.train_allgrams )
            self.corpora_size += len( line )

        self.train_unigrams = Counter( { x : self.train_unigrams[x] for x in self.train_unigrams if self.train_unigrams[x] >= min_tf } )
        self.train_allgrams = Counter( { x : self.train_allgrams[x] for x in self.train_allgrams if self.train_allgrams[x] >= min_tf } )
        
        def _cohesion( term ):
            size = len( term )
            numerator = self.train_allgrams.get( term )
            denominator = self.train_unigrams.get( term[0] )
            if not numerator : numerator = 0
            if not denominator : denominator = 0.2
            return math.pow( ( numerator/denominator), (1/ (size-self.alpha)  ) )

        def _cohesion_greedy( term ):
            c = _cohesion( term )
            w = math.exp( len( term ) )
            return c * w
        
        def _t_score( term ):
            o, e = expected_value( term, self.train_unigrams, self.train_allgrams, self.corpora_size )
            return t_score( o, e )
            
        def _simple_ll( term ):
            o, e = expected_value( term, self.train_unigrams, self.train_allgrams, self.corpora_size )
            return sim_ll( o, e )
        
        self.interior_boundary_calcurator = {
            "cohesion": _cohesion,
            "cohesion_greedy": _cohesion_greedy,
            "t_score": _t_score,
            "simple_ll": _simple_ll,
        }
        
        print( "* Fitting ... Done ({:.03f} sec)".format( time() - q_ ) )
        
        return self
        
    def transform( self, test, method="cohesion" ):
        
        self.test = test
        self.test_allgrams = Counter()
        self.test_cohesions = []
        interior_boundary_calcurator = self.interior_boundary_calcurator.get( method ) if self.interior_boundary_calcurator.get( method ) else self.interior_boundary_calcurator.get( "cohesion" )
        
        ## Get All grams
        q_ = time()
        
        for line in self.test:
            get_allgrams( line, self.test_allgrams )
    
        print( "* Allgram Extraction ... Done ({:.03f} sec)".format( time() - q_ ) )
        
        ## Get Cohesion Score
        q_ = time()
        for t_, f_ in self.test_allgrams.items():
            c_ = interior_boundary_calcurator( t_ )
            if c_ <= 0 : continue
            self.test_cohesions.append( ( t_, f_, c_ ) )
        
        self.test_cohesions = sorted( self.test_cohesions, key=lambda x: -x[2] )

        print( "* Cohesion Score Calcuration ... Done ({:.03f} sec)".format( time() - q_ ) )
        
        # Segment
        q_ = time()
        self.test_segmented = []
        cohesions_iter = list( enumerate( self.test_cohesions ) )
        
        for line in tqdm_notebook( self.test ):
            data_ = line + ""
            token_box_ = []
            
            for i, t_ in cohesions_iter:
                if t_[0] not in data_ : continue
                data_ = data_.replace( t_[0], self.mark_in.format(i) )
                token_box_.append( ( self.mark_out.format(i), t_[0] ) )

            for m, t in token_box_:
                data_ = data_.replace( m, t )

            self.test_segmented.append( data_.strip() )

        print( "* Segmentation ... Done ({:.03f} sec)".format( time() - q_ ) )
        
        return self.test_segmented



In [7]:
corpus_path = ["../data/DYBG_tn.txt", "../data/GAZS_tn.txt", "../data/YHYM_tn.txt"]
corpus1 = open( corpus_path[0], 'r', encoding="utf-8").readlines()
corpus2 = open( corpus_path[1], 'r', encoding="utf-8").readlines()
corpus3 = open( corpus_path[1], 'r', encoding="utf-8").readlines()

ct = CohesionTokenizer( alpha=0 )
ct.fit( corpus1 )
ct.fit( corpus2 )
ct.fit( corpus3 )

HBox(children=(IntProgress(value=0, max=34070), HTML(value='')))


* Fitting ... Done (3.802 sec)


HBox(children=(IntProgress(value=0, max=16555), HTML(value='')))


* Fitting ... Done (3.866 sec)


HBox(children=(IntProgress(value=0, max=16555), HTML(value='')))


* Fitting ... Done (3.574 sec)


<__main__.CohesionTokenizer at 0x21059e7b0f0>

In [8]:
data1 = "治勞役太甚或飮食失節身熱而煩自汗倦怠黃芪 一錢半人參白朮甘草各一錢當歸身陳皮各五分升麻柴胡各三分右剉作一貼水煎服"
data2 = "止代脈見宜服灸甘草湯人參黃芪湯脈虛軟宜服茯神湯補氣湯"
data3 = "煩主氣躁主血肺主皮毛氣熱則煩腎主津液血熱則躁故用梔子以治肺豆豉以潤腎宜黃連鷄子湯甘草乾薑湯芍藥甘草湯入門"
seg1 = ct.transform( [data1, data2, data3], method=["cohesion", "cohesion_greedy", "t_score", "simple_ll"][0] )
seg2 = ct.transform( [data1, data2, data3], method=["cohesion", "cohesion_greedy", "t_score", "simple_ll"][1] )
seg3 = ct.transform( [data1, data2, data3], method=["cohesion", "cohesion_greedy", "t_score", "simple_ll"][2] )
seg4 = ct.transform( [data1, data2, data3], method=["cohesion", "cohesion_greedy", "t_score", "simple_ll"][3] )
rst = list( zip( seg1, seg2, seg3, seg4 ) )
for l in rst:
    print( "\n".join(l) )
    print()

* Allgram Extraction ... Done (0.000 sec)
* Cohesion Score Calcuration ... Done (0.001 sec)


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


* Segmentation ... Done (0.015 sec)
* Allgram Extraction ... Done (0.001 sec)
* Cohesion Score Calcuration ... Done (0.001 sec)


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


* Segmentation ... Done (0.018 sec)
* Allgram Extraction ... Done (0.000 sec)
* Cohesion Score Calcuration ... Done (0.003 sec)


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


* Segmentation ... Done (0.014 sec)
* Allgram Extraction ... Done (0.000 sec)
* Cohesion Score Calcuration ... Done (0.003 sec)


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


* Segmentation ... Done (0.015 sec)
治 勞役  太甚 或 飮食  失節  身熱  而煩  自汗  倦怠  黃芪   一錢 半 人參  白朮  甘草  各一 錢 當歸 身 陳皮  各五分  升麻  柴胡  各三  分右  剉作一貼 水 煎服
治 勞役  太甚 或 飮食失節  身熱而 煩 自汗  倦怠  黃芪   一錢半人參  白朮甘草各一錢  當歸 身 陳皮各五分  升麻柴胡各三 分 右剉作一貼水煎服
治 勞役  太甚 或 飮食  失節  身熱  而煩  自汗  倦怠  黃芪   一錢 半 人參  白朮  甘草 各 一錢  當歸 身 陳皮 各 五分  升麻  柴胡  各三 分 右剉  作一貼 水 煎服
治 勞役  太甚 或 飮食  失節  身熱  而煩  自汗  倦怠  黃芪   一錢 半 人參  白朮  甘草  各一錢  當歸 身 陳皮  各五分  升麻  柴胡  各三 分 右剉作一貼  水煎服

止代 脈見  宜服 灸 甘草 湯 人參 黃 芪湯  脈虛 軟 宜服  茯神  湯補  氣湯
止代 脈見  宜服  灸甘草湯  人參黃芪湯  脈虛 軟 宜服  茯神湯  補氣湯
止代 脈見  宜服 灸 甘草 湯 人參  黃芪 湯 脈虛 軟 宜服  茯神  湯補  氣湯
止代 脈見  宜服 灸 甘草 湯 人參  黃芪 湯 脈虛 軟 宜服  茯神  湯補  氣湯

煩 主氣 躁 主血  肺主皮毛 氣 熱則 煩 腎主  津液  血熱 則躁 故用  梔子  以治 肺 豆豉  以潤 腎宜 黃連  鷄子 湯 甘草  乾薑 湯 芍藥  甘草 湯 入門
煩 主氣 躁 主血  肺主皮毛  氣熱則 煩 腎主  津液  血熱 則躁 故用  梔子  以治 肺 豆豉  以潤 腎 宜黃連  鷄子湯  甘草乾薑湯  芍藥甘草湯  入門
煩 主氣 躁 主血  肺主  皮毛 氣 熱則 煩 腎主  津液 血 熱則 躁 故用  梔子 以 治肺  豆豉  以潤 腎宜 黃連 鷄 子湯  甘草  乾薑 湯 芍藥  甘草 湯 入門
煩 主氣 躁 主血  肺主  皮毛 氣 熱則 煩 腎主  津液 血 熱則 躁 故用  梔子 以 治肺  豆豉  以潤 腎宜 黃連 鷄 子湯  甘草  乾薑 湯 芍藥  甘草 湯 入門



In [9]:
sample = ["炙甘草湯", "灸甘草湯", "甘草", "甘草湯", "灸甘草", "灸甘"]

[ s for s in  ct.test_cohesions if s[0] in sample ]

[('甘草', 4, 34365.97750930184),
 ('甘草湯', 2, 1945.9564822225157),
 ('灸甘草', 1, 504.48312157806305),
 ('灸甘草湯', 1, 115.34145653853527),
 ('灸甘', 1, 76.87907078418132)]

In [10]:
seg = ct.transform( corpus1, method="cohesion" )

corpus_seg_path = "../data/DYBG_seg.txt"
with open( corpus_seg_path, 'w', encoding="utf-8") as fl:
    fl.write( "\n".join( seg ) )

print("# Corpus Segmentation Done! ")



* Allgram Extraction ... Done (2.873 sec)
* Cohesion Score Calcuration ... Done (3.599 sec)


HBox(children=(IntProgress(value=0, max=34070), HTML(value='')))


* Segmentation ... Done (698.497 sec)
# Corpus Segmentation Done! 


## REF

* [Cohesion score + L-Tokenizer. 띄어쓰기가 잘 되어있는 한국어 문서를 위한 unsupervised tokenizer](https://lovit.github.io/nlp/2018/04/09/cohesion_ltokenizer/)
* [ratsgo's blog > Cohesion Probability](https://ratsgo.github.io/from%20frequency%20to%20semantics/2017/05/05/cohesion/)