In [3]:
import datetime
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy
from fastparquet import ParquetFile
from itertools import chain
import re
from addr_dict import state_dict, substate_dict
from stop_word_list import stop_words_lst

In [5]:
co_name_col = ['MER_NAME','CARD_USE_HIS']

address_cols = ['MER_ADDR'] 
phone_col = 'MER_MSISDN'
source = 'MEUMS_COLL_CARD_MER'

In [6]:
def delete_stopwords(s, stop_words_lst):
    """Clean strings by removing unwanted stopwords.
    
    Parameters
    ----------
    s : string. string to delete stopwords

    stop_words_lst : list. list of stopwords 


    Example
    -------

    stop_words_lst = ["(주)", "(유)", "(사)", "(의)", "(재)",
                    "주식회사", "유한회사", "사단법인", "의료법인", "재단법인",
                    "(사단)", "(유한)", "(의료)", "(법인)", "(재단)",
                    "(주식회사)", "(유한회사)", "(사단법인)", "(의료법인)", "(재단법인)", 
                    "[아이행복]", "온누리상품권", "무이자"]

    Returns
    -------
    string
    """

    for sw in stop_words_lst:
        # delete the stopword when store name starts with the stopword
        s = s.replace(sw, ' ')
#         if s.startswith(sw):
#             s = s.replace(sw, "")

#         # delete the stopword when store name starts with the stopword
#         elif s.endswith(sw):
#             s = s.replace(sw, "")

#         # delete the stopword when store name has the stopword (with space)
#         else: 
#             s = ' '.join([w for w in s.split() if w not in stop_words_lst])           


    return s

In [7]:
def replace_by(s, by_whitespace, by_none):
    """Replace something by whitespace or none.
    
    Parameters
    ----------
    s : series. series of string to delete something

    by_whitespace : regex or string to replace by single whitespace
    
    by_none : regex or string to replace by none.


    Example
    -------
    by_whitespace : r'[\(\)\[\]]' ==> replace brasekets like (), [] to single whitespace.
    by_none : r'[\-\_\]' ==> delete  -, _ .


    Returns
    -------
    series
    """

    if by_whitespace:
        s = s.str.replace(by_whitespace, ' ')
        # example : r'[\(\)\[\]]'

    if by_none:
        s = s.str.replace(by_none, '')
        # example : r'[\-\_\]'

    return s

In [8]:
def remove(s, bw_brackets = True, double_space = True, single_space = False):
    """
    Clean words between something.
    """

    if bw_brackets:
        s = s.replace(r'(\[.*?\]|\(.*?\)|\{.*?\})', '')

    if double_space:
        s = s.replace(r'\s\s+', ' ')

    if single_space:
        s = s.replace(r'\s', '')

    return s

In [9]:
def stop_words_lst_maker(df,col_name):
    
    match=[]
    match_list=[]
    for idx in list(df[col_name]):
        match.append(re.findall(r'^(\[.*?\]|\(.*?\)|\{.*?\})',idx))
            
    for idx in match:
        for i in idx:
            match_list.append(i)
        
    match_list=list(set(match_list))
    
    return match_list

In [10]:
def del_start_end_void(s):
    s=s.strip()
    return s

In [11]:
def preprocess_name(df, co_name_col, stop_words_lst):
    s = datetime.datetime.now()
    
    df[co_name_col + '_new'] = df[co_name_col].apply(delete_stopwords, stop_words_lst = stop_words_lst)
    df[co_name_col + '_new'] = replace_by(df[co_name_col + '_new'], by_whitespace=r'[\(\[]',by_none=r'[\)\]]')
    df[co_name_col + '_new'] = remove(df[co_name_col + '_new'], bw_brackets=False, double_space=True)
    
    df[co_name_col + '_new'] = df[co_name_col + '_new'].apply(del_start_end_void)
    
    df[co_name_col + '_yn'] = df[co_name_col] != df[co_name_col + '_new']
    
    print(datetime.datetime.now()-s, "time spent to preprocess name column")
    
    return df

In [12]:
# df[df['CARD_USE_HIS'].str.contains(r'\.\.\.')]

In [13]:
def check_processed_result(source_column, n_of_ex):
    new_cols = [i for i in df.columns if i.startswith(source_column)]
    print("Preprocessing source columns ", source_column)
    print("# of records changed : ", df[source_column + '_yn'].sum())
    try: 
        print("Examples:")
        display(df.loc[(df[source_column + '_yn'] == True), new_cols].sample(n_of_ex))
    except ValueError:
        print("No example.")

In [14]:
def split_addr_part1(addr):
    """
    Get state, a substate, else, and detail part from an address.

    :param addr: series.
    :return:
    """
    # seperate address by comma(,)
    if len(addr.split(",")) >= 2:
        addr_main = addr.split(",")[0]
        addr_detail = " ".join(addr.split(",")[1:])

    else:
        addr_main = addr
        addr_detail = ''

    return addr_main, addr_detail

In [15]:
def split_addr_part2(addr_main):

    # main 파트의 첫 번째 단어를 가져온다.
    if len(addr_main.split()) >= 2:
        addr_state = addr_main.split()[0]
        addr_substate = addr_main.split()[1]
        if len(addr_main.split()) >= 3:
            addr_else = " ".join(addr_main.split()[2:])
        else:
            addr_else = ''

    else:
        addr_state = addr_main
        addr_substate = ''
        addr_else =  ''

    return addr_state, addr_substate, addr_else

In [16]:
def standardize_state(state_col):
    state_col_new = state_col.copy()

    # Change state name to official name
    for state in state_dict.keys():
        condition = state_col.isin(state_dict[state])
        print(state, condition.sum())
        ids_under_contion = state_col.loc[condition].index
        state_col_new.loc[state_col_new.index.isin(ids_under_contion)] = state
    
#     print(state_col_new.unique())
        
    return state_col_new

# Blocking

In [17]:
def assign_blocks(df,address_cols):
    
    # address_cols is list 
    ad_col=address_cols[0]
    df['block1'] = None #block1 default is 'None'
    
    
    condition = (df[ad_col+'_state'].isin(list(state_dict.keys()))) & (df[ad_col+'_state'] != '세종특별자치시')
    df.loc[condition, 'block1'] = df.loc[condition, ad_col+'_state']

    condition = (df[ad_col+'_state'] == '세종특별자치시')
    df['block1'] = df['block1'].mask(condition, '세종특별자치시')

    df['block1'] = df['block1'].mask(df['block1'].isnull(), 'NoBlock1')
    
    df['block2'] = None #block2 default is 'None'
    
    #only use addr
    #주소 list를 만들때 이 함수를 고려하여 만들어야함.
    
    
    #Blocking step 1 : for POIs that have standardized substate name from ADDR and are not located in Sejong
    condition = (df[ad_col+'_substate'].isin(list(chain.from_iterable(substate_dict.values())))) & (df[ad_col+'_state'] != '세종특별자치시')
    df.loc[condition, 'block2'] = df.loc[condition, ad_col+'_substate']
    
    # Blocking step 2 : for POIs that are located in Sejong
    condition = (df[ad_col+'_state'] == '세종특별자치시')
    df['block2'] = df['block2'].mask(condition, '세종특별자치시')
    
    df['block2'] = df['block2'].mask(df['block2'].isnull(), 'NoBlock2')
    
    print("Finished blocking step 2.")
    
    return df

In [18]:
input_cols = ['SOURCE', 'SOURCE_ID', 'REG_DT', 
                  'MER_NAME_new','CARD_USE_HIS_new', 'block1', 'block2', 'MER_ADDR_new', 'MER_ADDR_else', 
                  'MER_MSISDN_new', 'PROCESSED_DT']

output_cols = ['SOURCE', 'SOURCE_ID', 'SOURCE_UPT_DT', 
                   'CO_NAME_new','HIS_NAME_new','block1', 'block2', 'ADDR_new', 'ADDR_else', 
                   'REP_PHONE_NUM_new','PROCESSED_DT']

output_table = "ci_dev.PROCESSED_CARD_LOGIN_recleaning"

co_name_col = ['MER_NAME','CARD_USE_HIS']
address_cols = ['MER_ADDR'] 
phone_col = 'MER_MSISDN'
source = 'MEUMS_COLL_CARD_MER'

In [19]:
output_db.execute("DROP TABLE {}".format(output_table))
output_db.execute("CREATE TABLE {}".format(output_table)+
                  " (SOURCE VARCHAR(30), SOURCE_ID INT , SOURCE_UPT_DT VARCHAR(50), "+ 
                   " CO_NAME_new VARCHAR(70), " +
                   " HIS_NAME_new VARCHAR (70), "
                   " block1 VARCHAR(35), block2 VARCHAR(35), "+
                   " ADDR_new VARCHAR(1000), ADDR_else VARCHAR(1000), "+
                   " REP_PHONE_NUM_new VARCHAR(50), PROCESSED_DT VARCHAR(50), PRIMARY KEY(SOURCE,SOURCE_ID),"+
                   " KEY first_index (block1,block2))"+
                   " CHARACTER SET utf8 COLLATE utf8_unicode_ci")

<sqlalchemy.engine.result.ResultProxy at 0x7f979ae2dcc0>

In [20]:
def write_to_db_in_chunk(input_data, input_cols, output_db, output_table, output_cols, chunk_size):
    s = datetime.datetime.now()
    t = len(input_data)
    max_chunk = t // chunk_size +1

    for idx, i in enumerate(range(0, t, chunk_size)):
        output_db.dispose()
        records = tuple(tuple(row) for row in input_data.loc[i:i+chunk_size, input_cols].values)
        insert_query = "INSERT INTO {} {} VALUES (%s, %s, %s, %s, %s, %s,%s, %s, %s, %s, %s)".format(output_table, tuple(output_cols))
        insert_query = insert_query.replace("'","")
        try:
            output_db.execute(insert_query, records)
        except sqlalchemy.exc.IntegrityError:
            write_to_db_in_piece(input_data.loc[i:i+chunk_size], input_cols, output_db, output_table, output_cols)
        
        print("Chunk", idx +1, "/", max_chunk, ": ", i, "to ", i+chunk_size, "records are inserted into {} table.".format(output_table))
        
        
    print("Time duration: ", datetime.datetime.now()-s)

In [21]:
def write_to_db_in_piece(input_data, input_cols, output_db, output_table, output_cols):
    s = datetime.datetime.now()
    
    for i,idx in input_data.iterrows():
        output_db.dispose()
        
        records = list(row for row in idx[input_cols])
        insert_query = "INSERT INTO {} {} VALUES (%s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s)".format(output_table, tuple(output_cols))
        insert_query = insert_query.replace("'","")
                
        try:
            output_db.execute(insert_query, tuple(records))
        except sqlalchemy.exc.IntegrityError:
            update_str= " SET " + " , ".join(" {} = %s ".format(n) for n in output_cols )
            source_id = output_cols[1]
            source = output_cols[0]
            update_query = ("UPDATE {} {} where {} = {} and {} = '{}' "
                            .format(output_table,update_str,source_id,idx[source_id],source,idx[source]))
            
            output_db.execute(update_query,idx[input_cols])
            
            
        
        if i%1000==0:
            print("10000 piece n= ", i/1000 , "records are inserted into {} table.".format(output_table))
        
        
    print("Time duration: ", datetime.datetime.now()-s)

In [22]:
on_off_line_df=pd.read_sql('select * from ci_dev.CARD_LOGIN_MER_TYPE_ONOFF',output_db)
on_off_list=on_off_line_df[on_off_line_df['variable']=='Offline']['MER_TYPE'].values.tolist()
n_df=pd.read_sql('select count(1) from eums.MEUMS_COLL_CARD_MER ',input_db)
range_n=n_df['count(1)'].values.tolist()[0]

for idx in range(0,(int)(range_n/10000)+1): #
    df=pd.read_sql('select * from eums.MEUMS_COLL_CARD_MER limit %d,10000'%(idx*10000),input_db)
   
    df = df.mask(df.isna(), 'None')
    df = df[~ (df['MER_NAME'].str.contains("개인택시") & ~df['MER_NAME'].str.contains("충전소"))]
    
    df=df[df['MER_TYPE'].isin(on_off_list)]
    df=df[~(df['CARD_COMP']=='LOTTE_CARD')]
    
    df['MER_ADDR']=df['MER_ADDR'].str.replace(r'\(.*\)$','')
    
    #stop_word_add=stop_words_lst_maker(df,co_name_col)
    
    #for idx in stop_word_add:
    #    stop_words_lst.append(idx)
    #stop_words_lst=list(set(stop_words_lst))
    
    #print(stop_words_lst)
    
    for a in address_cols:
        s = datetime.datetime.now()
        
        #preprocessing
    
    # ready
        new = [a + s for s in ['_main', '_detail', '_state', '_substate', '_else', '_quality']]
    
     # default quality is bad
        df[new[5]] = "bad"
    
    # mark records don't have value
        df[new[5]] = df[new[5]].mask(((df[a] == '') | (df[a] == ' ') | (df[a] == 'None')), 'NoAddress')
    
    # split by ','
        df[new[0]], df[new[1]] = zip(*df[a].apply(split_addr_part1))
        print(a, "column is splitted into ", new[:2], " Time duration:", datetime.datetime.now()-s)
    
    # split by ''
        df[new[2]], df[new[3]], df[new[4]] = zip(*df[new[0]].apply(split_addr_part2))
        print(a + "_main", "column is splitted into", new[2:-1], "Time duration:", datetime.datetime.now()-s)
    
    # standardize state column
        df[new[2]] = standardize_state(df[new[2]])
        df[new[5]] = df[new[5]].mask(df[new[2]].isin(list(state_dict.keys())), 'standardized')
        print((df[new[5]] == 'standardized').sum(), "are standardized in ", new[2], "column. Time duration:", datetime.datetime.now()-s)
    
    # assemble to make new addr column
        df[a + '_new'] = df[new[2]] + ' '+ df[new[3]]+ ' ' + df[new[4]]
    
    #final preprocessing
        df[a+'_new'] = remove(df[a+'_new'], bw_brackets= True, double_space= True, single_space= False)
    
    
        df[a + '_yn'] = df[a] != df[a + '_new']
    ##
    
    for kda in co_name_col:    
        df = preprocess_name(df, kda, stop_words_lst)
    
    
    s = datetime.datetime.now()
    df[phone_col+'_new'] = df[phone_col]
    df[phone_col + '_new'] =df[phone_col + '_new'].str.replace(r'[^0-9|^~]','')

    df[phone_col + '_yn'] = df[phone_col] != df[phone_col + '_new']
    print(datetime.datetime.now()-s, "time spent.")
    print(df[phone_col+'_new'])
    df=assign_blocks(df,address_cols)
    
    df['SOURCE'] = source
    df.columns.values[0] = 'SOURCE_ID'
    df['PROCESSED_DT'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    df['REG_DT'] =  df['REG_DT'].astype(str)
    
    write_to_db_in_chunk(df, input_cols, output_db, output_table, output_cols,10000)

MER_ADDR column is splitted into  ['MER_ADDR_main', 'MER_ADDR_detail']  Time duration: 0:00:00.025257
MER_ADDR_main column is splitted into ['MER_ADDR_state', 'MER_ADDR_substate', 'MER_ADDR_else'] Time duration: 0:00:00.071184
서울특별시 2253
경기도 2133
강원도 426
충청북도 288
충청남도 191
경상북도 404
경상남도 516
전라북도 147
전라남도 106
부산광역시 419
대구광역시 309
인천광역시 748
광주광역시 158
대전광역시 199
울산광역시 169
세종특별자치시 19
제주특별자치도 201
8686 are standardized in  MER_ADDR_state column. Time duration: 0:00:00.108998
0:00:00.105462 time spent to preprocess name column
0:00:00.113862 time spent to preprocess name column
0:00:00.012076 time spent.
0        0318508000
1        0216617335
2        0318520920
5         023247942
6        0318469977
7        0318525558
9         029777717
10       0318492527
11      03100000000
12       0318552555
13      07075361769
15       0318755189
16       0318411655
17       0318756664
18        025221866
19        025819595
20       0318778985
21        025889817
22       0318720079
23        02533587