In [5]:
import argparse
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
import logging
import pandas as pd
from pandasticsearch import DataFrame, Select
from sklearn import datasets
import threading
import time
from tool.sink import ESSink
from tqdm import tqdm

In [6]:
def log_init():
    LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
    DATE_FORMAT = "%m/%d/%Y %H:%M:%S %p"
    fs = logging.StreamHandler()
    logging.basicConfig(level=logging.WARNING, format=LOG_FORMAT, datefmt=DATE_FORMAT, handlers=[fs])

In [7]:
def params_init():
    parser = argparse.ArgumentParser()
    parser.add_argument('--es', help='ES host', type=str)
    parser.add_argument('--name', help='ES user', type=str)
    parser.add_argument('--password', help='ES password', type=str)
    parser.add_argument('--savename', help='ES features save', type=str)
    group = parser.add_mutually_exclusive_group()
    args = parser.parse_args()
    return args

In [8]:
def es_init(args):
    host = args['es']
    username = args['name']
    password = args['password']
    
    es = Elasticsearch(
        [host],
        http_auth=(username, password)
    )
    return es

In [30]:
class LoadData(threading.Thread):
    def __init__(self, es, tablename, cols):
        threading.Thread.__init__(self)
        self.es = es
        self.tablename = tablename
        self.cols = cols
        self.df = None
             
    def get_df():
        maxsize = 10000000
        data = self.es.search(index=self.tablename, body={"size": maxsize})
        self.df = Select.from_dict(data).to_pandas()[cols].astype(str)
    
    def run(self):
        logger.info('[pull data]', self.tablename)

开启线程： Thread-1
开启线程： Thread-2
Thread-1: Fri Dec  6 17:46:28 2019
Thread-2: Fri Dec  6 17:46:28 2019
Thread-2: Fri Dec  6 17:46:28 2019
Thread-1: Fri Dec  6 17:46:28 2019
Thread-1: Fri Dec  6 17:46:28 2019
Thread-2: Fri Dec  6 17:46:28 2019
Thread-2: Fri Dec  6 17:46:28 2019
Thread-1: Fri Dec  6 17:46:28 2019
Thread-1: Fri Dec  6 17:46:28 2019
Thread-2: Fri Dec  6 17:46:28 2019


In [None]:
def feature_engineering(es, args):
    savename = args['save-name']
    info = []
    user_info = LoadData(es, 'index_user_info', 'os, phone, sex')
    item_info = LoadData(es, 'syp', 'bbsid, bbstype')
    context_info = LoadData(es, 'index_syp_user_suggest', 'userid, bbsid')
    user_info.start()
    item_info.start()
    context_info.start()
    info.append(user_info).append(item_info).append(context_info)
    for i in info:
        i.join()

In [None]:
def main():
    log_init()
    args = params_init()
    logger.info('[params]', args)
    es = es_init(args)
    feature_engineering(es, args)
    
if __name__ == '__main__':
    main()