In [None]:
from pyspark.sql.types import FloatType, IntegerType, StringType
import pyspark.sql.functions as F
import datetime
import pandas as pd
from tqdm import tqdm
from dateutil.parser import parse
import numpy as np
import json, os
from pyspark.storagelevel import StorageLevel
import operator

from utils import *
import warnings
warnings.filterwarnings("ignore")

flags = [False]
task = "user_profile_v1"


In [None]:
contexts = ['display', 'cid', 'bundle', 'pub', 'subcat', 'adid', 'advertiser_id', 'tagid', 'size', 'crid',
            'make', 'state', 'adtype', 'month', 'camera_pixels', 'osv', 'reward', 'appcat', 'os', 'chipset',
            'devicetype', 'city', 'model', 'hour']

contexts = ["osv", "advertiser_id", "bundle", "cid", "crid"]
base_path = "hdfs://nameservice1/user/hive/warehouse/lixiang.db/%s" % task
codec = "org.apache.hadoop.io.compress.GzipCodec"


In [None]:
def profile_cnt(xs, day):
    data = pd.DataFrame(map(lambda x: x.asDict(), xs))
    ans = {"cnt": len(data), "time": int(day.timestamp())}
    for k in contexts:
        ans.update({k: data.groupby(k).size().to_dict()})
    return ans


def filter_uid(x, day):
    if x["time"] > 0:
        if (day - datetime.datetime.fromtimestamp(x["time"])).days > 90:
            return False
    return True


In [None]:
start_date = datetime.datetime.now() - datetime.timedelta(days=30)
now = datetime.datetime.now() - datetime.timedelta(days=1)
dates = pd.date_range(start_date, now)
for i in range(2, len(dates)):
    last_day = dates[i - 1]
    day = dates[i]
    y, m, d = day.strftime("%Y"), day.strftime("%m"), day.strftime("%d")
    
    cmd = "hadoop fs -test -e %s/daily/%s/_SUCCESS;echo $?" % (base_path, day.strftime("%Y%m%d"))
    out = exec_cmd(cmd)
    print("cmd>>>", cmd, out, day.strftime("%Y%m%d"))
    if out is not None and int(out) != 0:
        spark = set_spark(task, flags)
        sc = spark.sparkContext
        
        paths = []
        for evt in ["imp", "clk"]:
            with open("./%s.sql" % evt) as f:
                sql = f.read()
            sql = sql.format(y=y, m=m, d=d)
            
            df = spark.sql(sql)
            for k, dtype in dict(df.dtypes).items():
                if dtype == "string":
                    df = df.withColumn(k, F.udf(fill_na)(k))
            rdd = df.rdd.map(lambda x: (x["deviceid"], [x])).reduceByKey(operator.add) \
                        .mapValues(lambda xs: json.dumps({evt: profile_cnt(xs, day)})) \
                        .repartition(50) \
                        .map(lambda x: "%s\t\t%s" % (x[0], x[1]))
            savedPath = "%s/%s/%s" % (base_path, evt, day.strftime("%Y%m%d"))
            paths.append(savedPath)
            rdd.saveAsTextFile(savedPath, codec)
        #merge imp/clk
        rdd = sc.textFile(",".join(paths)).map(lambda x: x.strip("\n").split("\t\t")).mapValues(lambda x: [json.loads(x)])
        rdd = rdd.reduceByKey(operator.add) \
            .mapValues(lambda lst: json.dumps(map_union(lst))) \
            .repartition(50) \
            .map(lambda x: "%s\t\t%s" % (x[0], x[1]))
        savedPath = "%s/daily/%s" % (base_path, day.strftime("%Y%m%d"))
        # savedPath = "%s/%s" % (base_path, day.strftime("%Y%m%d"))
        rdd.saveAsTextFile(savedPath, codec)
        
        # break
        # merge last day
        last_profile = "%s/%s" % (base_path, last_day.strftime("%Y%m%d"))
        profile = "%s/daily/%s" % (base_path, day.strftime("%Y%m%d"))
        rdd1 = sc.textFile(last_profile).map(lambda x: x.strip("\n").split("\t\t")).mapValues(lambda x: [weight_decay(json.loads(x))])
        rdd2 = sc.textFile(profile).map(lambda x: x.strip("\n").split("\t\t")).mapValues(lambda x: [json.loads(x)])

        yd_profile = rdd1.union(rdd2).reduceByKey(operator.add) \
                .mapValues(lambda lst: map_reduce(lst)) \
                .filter(lambda x: filter_uid(x[1], day)) \
                .mapValues(json.dumps) \
                .repartition(50) \
                .map(lambda x: "%s\t\t%s" % (x[0], x[1]))

        savedPath = "%s/%s" % (base_path, day.strftime("%Y%m%d"))
        yd_profile.saveAsTextFile(savedPath, codec)

        ##rmr
        for evt in ["imp", "clk"]:
            cmd = "hadoop fs -rmr  %s/%s/%s" % (base_path, evt, day.strftime("%Y%m%d"))
            exec_cmd(cmd)



In [None]:
try:
    spark.stop()
except:
    pass