# 运行配置

## 参数配置

In [1]:
abs_path = 'D:\\Documents\\vivo\\0621\\'
user_file='0621_douyin_user_info.xlsx'
video_file = '0621_douyin_video_info.xlsx'
brand_file = "VIVO品牌码表.xlsx"
comment_file = '0621_douyin_comment_info.xlsx'

In [2]:
es_hosts = [{'host':'proj1', 'port':9200}]
index = "vivo_douyin_0621"
doc_type = "douyin"

fields = [
    "item_id|long",
    "user_item_id|long",
    "video_item_id|long",
    "keyword|string|words",
    "module|string|not_analyzed",
    "site|string|not_analyzed",
    "author|string|words",
    "title|string|words",
    "content|string|words",
    "like_count|long",
    "url|string|not_analyzed",
    "replies|long",
    "data_type|string|not_analyzed",
    "publish_date|long",
    "update_date|long",
    "sourceCrawlerId|long",
    "video_url|string|not_analyzed",
    "video_brand|string|not_analyzed",
    "video_content|string|words",
    "video_publish_date|long",
    "video_user_fans|long",
    "video_user_name|string|words",
    "video_user_register_id|string|not_analyzed"
]


In [3]:
target_comment_sheet = "sheet0"

is_create_index = True

# 运行程序

In [4]:
import sys

sys.path.append("..")

In [5]:
import pandas as pd

from utils.timer import timer
from utils.excels import each_row, save_excel

In [6]:
import os

Bytes_Per_Sec = 201953523 / 333.0

@timer
def read_excel(file, name=None, rows=None):
    """
    读取Excel
    
    默认读取所有sheet, 可以指定
    默认读取所有行, 可以指定: 指定小批量行数并不会节约时间
    读取200MB约350s
    """
    file_bytes = os.path.getsize(file)
    print("predict cost: %.5fs" % (file_bytes / Bytes_Per_Sec))
    return pd.read_excel(file, nrows=rows, sheet_name=name)


## 用户处理

In [7]:
user_sheet = read_excel(abs_path + user_file, "sheet0")
print(user_sheet.shape)

# start at 1561299626.30677
predict cost: 1.09715s
# end at 1561299627.15051
total cost: 0.84374
(2373, 29)


In [8]:
user_tags = {}

def user_parser(idx, key, row):
    user_tag = {}
    
    uid = row["uid"]
    name = row["user_name"]
    nick = row["register_id"]
    fans = row["fans_num"]
    
    user_tag["uid"] = uid
    user_tag["name"] = name
    user_tag["fans"] = fans
    user_tag["register_id"] = nick
    
    user_tags[uid] = user_tag
    
    return True
    
each_row(user_sheet, user_parser, 1000)

# start at 1561299630.29815
current time: 1561299630.413838
current idx: 1000
current time: 1561299630.514544
current idx: 2000
# end at 1561299630.55942
total cost: 0.26127


In [9]:
print(len(user_tags))
for k, v in user_tags.items():
    print(k, v)
    break

2373
100003454600 {'uid': 100003454600, 'name': '极好金服', 'fans': 318443, 'register_id': 'jihaojinfu'}


## 视频处理

In [10]:
video_sheet = read_excel(abs_path + video_file, "sheet0")
print(video_sheet.shape)

# start at 1561299637.24730
predict cost: 2.71257s
# end at 1561299638.52090
total cost: 1.27360
(3289, 30)


In [11]:
brand_sheet = read_excel(abs_path + brand_file, "Sheet1")
print(brand_sheet.shape)

# start at 1561299656.03923
predict cost: 0.02142s
# end at 1561299656.09009
total cost: 0.05086
(56, 5)


In [12]:
brandKB = {}

def brand_parser(idx, key, row):
    brand_name = row['品牌'].strip()
    if brand_name not in brandKB:
        brandKB[brand_name] = set()
    
    if type(row['品牌产品']) is str:
        brand_type_name = row['品牌产品'].strip()
        if brand_type_name:
            brandKB[brand_name].add(brand_type_name)
    
    brand_keyword_str = row['关键词'].strip()
    if brand_keyword_str.startswith('#'):
        brandKB[brand_name].add(brand_keyword_str)
    else:
        for kw in brand_keyword_str.split(','):
            if type(kw) is str:
                brandKB[brand_name].add(kw)
    
    return True
                
each_row(brand_sheet, brand_parser)
print(len(brandKB))


# start at 1561299663.96275
# end at 1561299663.96873
total cost: 0.00598
6


In [13]:
def kw_match(content, keywords, case_sensitive=True):
    """
    关键词匹配
    """
    for kw in keywords:
        # 大小写敏感
        if case_sensitive and kw in content:
            return True
        # 大小写不敏感
        if not case_sensitive and kw.lower() in content.lower():
            return True
    return False

def brand_match(content, kb):
    """
    品牌匹配
    """
    for key, value in kb.items():
        if kw_match(content, value, False):
            return key
    
    return None

# TEST CASE

print(brand_match("vivo", brandKB))
print(brand_match("#我才是实力自拍王", brandKB))
print(brand_match("vivo", brandKB))
print(brand_match("#华为", brandKB))
print(brand_match("小米Play", brandKB))
print(brand_match("小米play", brandKB))

VIVO
华为
VIVO
None
小米
小米


In [14]:
import time

def common_time_parse(origin):
    """
    时间处理
    """
    real_time = time.localtime() # 默认当前时间
    if type(origin) is str and len(origin) == 19:
        real_time = time.strptime(origin, "%Y-%m-%d %H:%M:%S")
    elif len(str(origin)) == 14:
        real_time = time.strptime(str(origin), "%Y%m%d%H%M%S")

    return real_time

# time.strftime(real_time, "%Y-%m-%d %H:%M:%S")

In [15]:
video_tags = {}

def video_parser(idx, key, row):
    video_tag = {}
    
    url = row['url']
    vid = url.split('/')[5]
    uid = row["user_item_id"]
    
    publish_date = time.strftime("%Y-%m-%d %H:%M:%S", common_time_parse(row["publish_date"]))
    title = row["title"]
    content = row['content']
    brand = ""
    if type(content) is str:
        brand = brand_match(content, brandKB)
        
    video_tag["vid"] = vid
    video_tag["url"] = url
    video_tag["publish_date"] = publish_date
    video_tag["title"] = title
    video_tag["content"] = content
    video_tag["brand"] = brand
       
    user_tag = {}
    if uid in user_tags:
        user_tag = user_tags[uid]
    else:
        print("user not exist: %s" % uid)
    for k, v in user_tag.items():
        video_tag["user_" + k] = v

    video_tags[vid] = video_tag
    return True
    
each_row(video_sheet, video_parser, 1000)
print(len(video_tags))

# start at 1561299697.14777
current time: 1561299697.433248
current idx: 1000
user not exist: 1552128132788478
current time: 1561299697.790090
current idx: 2000
user not exist: 3117834411713803
user not exist: 1358614413710435
user not exist: 3698379986043395
user not exist: 4208551057174253
current time: 1561299698.096976
current idx: 3000
user not exist: 2642853481685412
# end at 1561299698.17697
total cost: 1.02920
3289


In [16]:
print(len(video_tags))
for k, v in video_tags.items():
    print(k)
    print(v)
    break

3289
6641358299646135560
{'vid': '6641358299646135560', 'url': 'https://www.iesdouyin.com/share/video/6641358299646135560/?region=CN&mid=6641358312489257732&u_code=jdad2faj&titleType=title', 'publish_date': '2019-01-01 11:02:08', 'title': '@开箱迷创作的原声', 'content': '#小米 #小米play 元旦快乐！上次给大家带来小米play的开箱，应粉丝请求，这次测试下它的游戏性能！', 'brand': '小米', 'user_uid': 97290882272, 'user_name': '开箱迷', 'user_fans': 167307, 'user_register_id': 'kaixiangmi'}


## 评论处理



In [17]:
comment_all_sheet = read_excel(abs_path + comment_file)
print(len(comment_all_sheet))
print(comment_all_sheet.keys())

# start at 1561299710.46383
predict cost: 227.94954s
# end at 1561299978.86604
total cost: 268.40221
2
odict_keys(['sheet0', 'sheet1'])


> 安全隔离带

In [18]:
from elasticsearch import Elasticsearch

from utils.elasticsearchs import pretty_print, create_index, insert_bulk

In [19]:
proj_es_17 = Elasticsearch(es_hosts)

1. 准备索引

In [24]:
if is_create_index:
    create_res = create_index(proj_es_17, index, doc_type, fields)
    print(create_res)
else:
    print("do not create index")

index body: {
  "mappings": {
    "douyin": {
      "dynamic": "strict",
      "properties": {
        "author": {
          "analyzer": "words",
          "type": "string"
        },
        "content": {
          "analyzer": "words",
          "type": "string"
        },
        "data_type": {
          "doc_values": true,
          "index": "not_analyzed",
          "type": "string"
        },
        "item_id": {
          "type": "long"
        },
        "keyword": {
          "analyzer": "words",
          "type": "string"
        },
        "like_count": {
          "type": "long"
        },
        "module": {
          "doc_values": true,
          "index": "not_analyzed",
          "type": "string"
        },
        "publish_date": {
          "type": "long"
        },
        "replies": {
          "type": "long"
        },
        "site": {
          "doc_values": true,
          "index": "not_analyzed",
          "type": "string"
        },
        "sourceCrawlerId": {
 

RequestError: RequestError(400, 'IndexAlreadyExistsException[[vivo_douyin_0621] already exists]', 'IndexAlreadyExistsException[[vivo_douyin_0621] already exists]')

2. 写入数据

In [25]:
def flush(es, index, doc_type, data, size=3000, force=False):
    if not force:
        if len(data) < size:
            return False
        
    origin_size = len(data)
    success, info = insert_bulk(es, index, doc_type, data)
    data.clear()
    
    if success < origin_size:
        print(info)
        
    return True

In [26]:
import math

comment_datas = {}
def comment_parser(idx, key, row):
    comment_data = {}

    # 1. origin info
    cid = row["item_id"]
    url = row["url"]
    vid = url.split('/')[5]

    comment_data["item_id"] = cid
    comment_data["url"] = url
    comment_data["video_item_id"] = vid
    comment_data["publish_date"] = int(time.mktime(common_time_parse(row["publish_date"])))
    comment_data["update_date"] = int(time.mktime(common_time_parse(row["update_date"])))
 
    same_keys = set(["user_item_id", 
                      "keyword", "module", "site",
                      "author", "title", "content",
                      "like_count", "replies", "data_type",
                      "sourceCrawlerId"])
    for k in same_keys:
        if type(row[k]) is float and math.isnan(row[k]):
            continue
        comment_data[k] = row[k]

    # 2. video info
    video_tag = video_tags[vid]
    comment_data["video_publish_date"] = int(time.mktime(common_time_parse(video_tag["publish_date"])))

    video_keys = set(["url", "brand", "content",
                     "user_fans", "user_name", "user_register_id"])
    for k in video_keys:
        if k in video_tag: # 有一个用户信息没抓到
            comment_data["video_" + k] = video_tag[k]

    # 3. store
    comment_datas[cid] = comment_data
    flush(proj_es_17, index, doc_type, comment_datas)
#     print("after insert: %d" % len(comment_datas))

    return True


In [27]:
# 选择需要处理的表单数据
comment_test_sheet = comment_all_sheet[target_comment_sheet]
print(comment_test_sheet.shape)

# TODO: 带上flush机制的each_row函数
# 逐行处理
each_row(comment_test_sheet, comment_parser, 5000)

# 最后的回收
# print("before %d" % len(comment_datas))
flush(proj_es_17, index, doc_type, comment_datas, force=True)
comment_datas.clear()
# print("after %d" % len(comment_datas))

# chunksize=4, csv才有

(500000, 23)
# start at 1561300630.87483
action size: 3000
current time: 1561300649.761650
current idx: 5000
action size: 3000
action size: 3000
current time: 1561300680.920154
current idx: 10000
action size: 3000
action size: 3000
current time: 1561300716.877865
current idx: 15000
action size: 3000
current time: 1561300735.822429
current idx: 20000
action size: 3000
action size: 3000
current time: 1561300770.067188
current idx: 25000
action size: 3000
action size: 3000
current time: 1561300805.154453
current idx: 30000
action size: 3000
current time: 1561300824.291895
current idx: 35000
action size: 3000
action size: 3000
current time: 1561300850.882299
current idx: 40000
action size: 3000
action size: 3000
current time: 1561300882.487523
current idx: 45000
action size: 3000
current time: 1561300896.377103
current idx: 50000
action size: 3000
action size: 3000
current time: 1561300926.186522
current idx: 55000
action size: 3000
action size: 3000
current time: 1561300954.867304
current

In [80]:
# print(len(comment_tags))
# for k, v in comment_tags.items():
#     print(k)
#     print(v)
#     break
# print(len(comment_infos))
# for k, v in comment_infos.itmes():
#     print(k)
#     print(v)
#     break

In [81]:
# comment_infos_df = pd.DataFrame(comment_infos)
# comment_tags_df = pd.DataFrame(comment_tags)

# comment_df = pd.concat([comment_infos_df, comment_tags_df], axis=0)

# print(comment_infos_df.shape)
# print(comment_tags_df.shape)
# print(comment_df.shape)

In [82]:
# save_excel(comment_df.T, abs_path + "comment-" + current_sheet + ".xlsx")