In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import json
import codecs
import sys

In [4]:
def user_province(spark):
    provinces = spark.sql("select distinct province from songwt.lr_sample where label=1 and country='中国'").collect()
    _list = ["uCity=%s" % row.province for row in provinces]
    _list.append("uCity=国外")
    _list.append("uCity=others")
    return _list

def item_city(spark):
    city = spark.sql("SELECT new_city, row_number() OVER(ORDER BY new_city) as num from (SELECT DISTINCT new_city from songwt.city_normalize) t").collect()
    _list = [int(row.num) for row in city]
    _max = max(_list)
    _list2 = []
    for ln in _list:
        _list2.append("province=%d" % ln)
    
    _list2.append("province=0")
    return [_list2, _max]

def craftsman(spark):
    seller = spark.sql("select uid from features.craftsman_profile").collect()
    _list = [int(row.uid) for row in seller]
    _max = max(_list)
    _list2 = []
    for ln in _list:
        _list2.append("hashUid=%d" % ln)
    _list2.append("hashUid=0")
    return [_list2, _max]

def item_cid(spark):
    cid = spark.sql("SELECT id from kaipao.dj_category").collect()
    _list = [int(row.id) for row in cid]
    _max = max(_list)
    _list2 = []
    for ln in _list:
        _list2.append("categoryId=%d" % ln)
    _list2.append("categoryId=0")
    return [_list2, _max]

def item_pid(spark):
    pid = spark.sql("SELECT DISTINCT parent_id from kaipao.dj_category").collect()
    _list = [int(row.parent_id) for row in pid]
    _list2 = []
    _max = max(_list)
    for ln in _list:
        _list2.append("parentCategoryId=%d" % ln)
    return [_list2,_max]

def item_kind():
    _list = ["kind=%d" % i for i in range(0,49)]
    return [_list, 48]



def main(spark):
    ctr = [("reItemCtr=0",0,0.075),("reItemCtr=1",0.075,0.08730159),("reItemCtr=2",0.08730159,0.09428571),("reItemCtr=3",0.09428571, 0.09803922),
              ("reItemCtr=4",0.09803922,0.1015625),("reItemCtr=5",0.1015625,0.11111111),("reItemCtr=6",0.11111111,0.11956522),
           ("reItemCtr=7",0.11956522,0.13461538),("reItemCtr=8",0.13461538,0.15957447),("reItemCtr=9",0.15957447,1)]
    collect=[("collectInterval=0",0,0),("collectInterval=1",0,1),("collectInterval=2",1,3),("collectInterval=3",3,16),("collectInterval=4",16,10000)]
    addcart=[("addcartInterval=0",0,0),("addcartInterval=1",0,1),("addcartInterval=2",1,2),("addcartInterval=3",2,3),("addcartInterval=4", 3, 10000)]
    grade=["categoryGrade=0","categoryGrade=1","categoryGrade=2","categoryGrade=3","categoryGrade=4","categoryGrade=5"]
    sale = [("sellcntInterval=0",0,0),("sellcntInterval=1",0,1),("sellcntInterval=2",1,2),("sellcntInterval=3",2,3),("sellcntInterval=4",4,10000)]
    _type = ["uType=0","uType=1"]
    preferencekind = ["preferenceKind=0", "preferenceKind=1"]
    preferencecategory = ["preferenceCategory=0", "preferenceCategory=1"]
    
    features = list()
    
    for _input in [user_province(spark), _type]:
        for idx, name in enumerate(_input):
            feature = dict()
            fe = name.split("=")[0]
            feature["name"] = name
            feature["type"] = "categorical"
            feature["channel"] = "input"
            feature["index"] = idx
            feature["max"] = 2
            feature["field"] = fe
            feature["field_type"] = "string"
            feature["params"] = [fe]
            feature["template_language"] = "mustache"
            feature["template"] = {"function_score":{"field_value_factor":{"field":fe,"missing":0}}}
            features.append(feature)
    
    for query in [item_cid(spark), item_pid(spark), item_kind(), craftsman(spark), item_city(spark)]:
        query1 = query[0]
        _max = query[1]
        for idx, name in enumerate(query1):
            feature = dict()
            fe = name.split("=")[0]
            feature["name"] = name
            feature["type"] = "categorical"
            feature["channel"] = "query"
            feature["index"] = idx
            feature["max"] = _max
            feature["field"] = fe
            feature["field_type"] = "int"
            feature["template_language"] = "mustache"
            feature["template"] = {"function_score":{"field_value_factor":{"field":fe,"missing":0}}}
            features.append(feature)
        
    for query in [grade]:
        for idx, name in enumerate(query):
            feature = dict()
            fe = name.split("=")[0]
            feature["name"] = name
            feature["type"] = "categorical"
            feature["channel"] = "query"
            feature["index"] = idx
            feature["max"] = 5
            feature["field"] = fe
            feature["field_type"] = "int"
            feature["template_language"] = "mustache"
            feature["template"] = {"function_score":{"field_value_factor":{"field":fe,"missing":0}}}
            features.append(feature)
            
    for query in [collect, addcart, sale]:
        for idx, name in enumerate(query):
            feature = dict()
            fea = name[0]
            fe = fea.split("=")[0]
            dayu = name[1]
            xiaoyu = name[2]
            feature["name"] = fea
            feature["type"] = "range"
            feature["channel"] = "query"
            feature["index"] = idx
            feature["max"] = len(query)-1
            feature['field'] = fe
            feature["field_type"] = "float"
            feature["template_language"] = "mustache"
            if idx == 0:
                feature["template"] = {"range":{fe:{"gte":0, "lte":0}}}
            else:
                feature["template"] = {"range":{fe:{"gt":dayu, "lte":xiaoyu}}}
            features.append(feature)
            
    for query in [ctr]:
        for idx, name in enumerate(query):
            feature = dict()
            fea = name[0]
            fe = fea.split("=")[0]
            dayu = name[1]
            xiaoyu = name[2]
            feature["name"] = fea
            feature["type"] = "range"
            feature["channel"] = "query"
            feature["index"] = idx
            feature["max"] = len(query)-1
            feature['field'] = fe
            feature["field_type"] = "float"
            feature["template_language"] = "mustache"
            feature["template"] = {"range":{fe:{"gt":dayu, "lte":xiaoyu}}}
            features.append(feature)
            
    for binary in [preferencekind, preferencecategory]:
        for idx, name in enumerate(binary):
            feature = dict()
            fe = name.split("=")[0]
            feature["name"] = name
            feature["type"] = "categorical"
            feature["channel"] = "binary"
            feature["index"] = idx
            feature["max"] = len(binary)-1
            feature["field"] = fe
            feature["field_type"] = "int"
            feature["params"] = [fe]
            feature["template_language"] = "mustache"
            if fe == "preferenceKind":
                field = "kind"
            else:
                field = "categoryId"
            feature["template"] = {"function_score":{"field_value_factor":{"field":field, "missing":0}}}
            features.append(feature)
            
    _finally = {"validation":{"params":{"uCity":"浙江省","uType":0,"preferenceCategory":[781], "preferenceKind":[1,2]},"index":"item"},
                "featureset":{"features":features}}
    f = open("lr_features.json", "w")
    json.dump(_finally, f)
    print("finished")
    
    

def test(spark):
    _list = list()
    for ln in codecs.open("1.txt", "rb"):
        ln = ln.strip().decode("utf8")
        lnn = ln.split("\t")
        name = lnn[0]
        value = lnn[1]
        if name == "uid":
            _list.append((value, value))
    print(_list)
    df = spark.createDataFrame(_list, ['sellerid', 'hashuid'])
    df.write.format("parquet").mode("overwrite").saveAsTable("features.lr_hashuid")
    print("finished")
    
            

    
    
            
    
            
    

if __name__ == "__main__":
    spark = SparkSession.builder.appName("lr_sample_train").getOrCreate()
    main(spark)
    #item_cid(spark)
    #test(spark)
    
    


    
    
    
    
    

finished
