In [11]:
import random
from datetime import datetime, date, timedelta

import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import (
    GBTClassificationModel,
    RandomForestClassificationModel,
    LogisticRegressionModel,
)
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
import subprocess 
from functools import reduce
from pyspark.sql.types import DoubleType 

In [3]:
application_name = "zhy_autopred"

spark.stop()
spark = (
    SparkSession.builder.appName(application_name)
    .config("spark.dynamicAllocation.maxExecutors", "200")
    .config("spark.default.parallelism", "1500")
    .config("spark.sql.shuffle.partitions", "1500")
    .config("spark.executor.memoryOverhead", "10G")
    .getOrCreate()
)
spark 

In [19]:
class model_prediction:
    def __init__(self, model_name, model_type):
        self.model_name = model_name
        self.model_type = model_type
        self.operator = self.model_name.split("_")[0]
        self.n_dpi_days = 15
        self.model_basic_message = spark.read.parquet(
            f"/user/zhuyan/model/{self.model_name}/model_basic_message"
        )
        self.dpi_symbol = "8_dpi_result" if self.operator == "cmcc" else "9_dpi_result"
        self.operator_id = {"ctcc": 0, "cmcc": 1, "cucc": 2}.get(self.operator)
        self.df_message = self.model_basic_message.toPandas()
        self.n_visited, self.n_sent, self.n_sent_bt, self.n_host = (
            f"e_n_visited_{self.dpi_symbol}_modelling",
            f"e_n_sent_{self.operator}_modelling",
            f"e_n_sent_bt_{self.operator}_modelling",
            f"e_n_host_{self.dpi_symbol}_modelling",
        )
        self.fake_data = spark.read.parquet(
            f"/user/zhuyan/model/{self.model_name}/fake_data"
        )
        self.pipeline_model = PipelineModel.load(
            f"/user/zhuyan/model/{self.model_name}/pipeline_model"
        )
        if self.model_type == "gbt":
            self.model = GBTClassificationModel.load(
                f"/user/zhuyan/model/{self.model_name}/{self.model_type}"
            )
        elif self.model_type == "rdf":
            self.model = RandomForestClassificationModel.load(
                f"/user/zhuyan/model/{self.model_name}/{self.model_type}"
            )
        else:
            self.model = LogisticRegressionModel.load(
                f"/user/zhuyan/model/{self.model_name}/{self.model_type}"
            )
        self.hosts = str(
            tuple(self.df_message.hosts.loc[lambda x: x != "null_value"].tolist())
        )
        self.products_click = str(
            tuple(
                self.df_message.products_click.loc[lambda x: x != "null_value"].tolist()
            )
        )
        self.product_sent = str(
            tuple(
                self.df_message.products_send.loc[lambda x: x != "null_value"].tolist()
            )
        )

    def get_people_for_pred(self):
        spark.sql(
            f"""
        select all.rysecret
        from (  select phone_number rysecret, myq.host
                from etl_fetch.{self.dpi_symbol}
                lateral view explode(hosts) q as myq
                where p_biz in ('jrunion')
                    AND p_date >= date_format(date_add(current_date(), -40), 'yyyyMMdd')
                    AND myq.host IN {self.hosts}
              union all
              select distinct rysecret, 1 host
                from etl_swap.rp_biz_access_log
                where p_biz in ('loan', 'credit', 'insurance') 
                and dt >= date_sub(current_date(), 180)
            ) all
        join (select uid rysecret
                from dw_resources.mapping_uid_property
                where p_operate = {self.operator_id}
                and p_province not in ('11')
                ) pp on all.rysecret = pp.rysecret
        GROUP BY all.rysecret
        """
        ).cache().createOrReplaceTempView("people_for_prediction")

    def get_dummy_data(self, df, df_message, df_col, df_message_col, label=False):
        """
        df_col, df_message_col为list
        """
        rename_dic = dict(zip(df_message_col, df_col))
        fillna_dic = {"rysecret": "A", "sent_date": "22220202"}
        if label:
            fillna_dic["label"] = 0
        dummy_data = (
            pd.concat(
                [
                    pd.DataFrame(columns=df.columns),
                    (
                        df_message[df_message_col]
                        .replace("null_value", np.nan)
                        .dropna(how="all")
                        .fillna(method="ffill")
                        .rename(columns=rename_dic)
                    ),
                ]
            )
            .fillna(fillna_dic)
            .fillna(-999)
        )
        return dummy_data

    def deal_with_data(self):
        label_data = spark.sql(
            f"""
        SELECT 

            all.rysecret, 
            n_ins_host_10, n_ins_host_20, n_ins_host_30, fre_ins_host_10, fre_ins_host_20, fre_ins_host_30,
            n_loan_host_10, n_loan_host_20, n_loan_host_30, fre_loan_host_10, fre_loan_host_20, fre_loan_host_30,
            n_credit_host_10, n_credit_host_20, n_credit_host_30, fre_credit_host_10, fre_credit_host_20, fre_credit_host_30,
            n_other_host_10, n_other_host_20, n_other_host_30,  fre_other_host_10, fre_other_host_20, fre_other_host_30,
            ins_host_rate_10, ins_host_rate_20, ins_host_rate_30, ins_fre_rate_10, ins_fre_rate_20, ins_fre_rate_30,
            loan_host_rate_10, loan_host_rate_20, loan_host_rate_30, loan_fre_rate_10, loan_fre_rate_20, loan_fre_rate_30,
            credit_host_rate_10, credit_host_rate_20, credit_host_rate_30, credit_fre_rate_10, credit_fre_rate_20, credit_fre_rate_30,
            n_ins_host_avg_30, n_ins_host_sd_30, n_ins_host_cv_30,
            n_loan_host_avg_30, n_loan_host_sd_30, n_loan_host_cv_30,
            n_credit_host_avg_30, n_credit_host_sd_30, n_credit_host_cv_30,
            n_other_host_avg_30, n_other_host_sd_30, n_other_host_cv_30,
            ins_host_rate_avg_30, ins_host_rate_sd_30, ins_host_rate_cv_30,
            loan_host_rate_avg_30, loan_host_rate_sd_30, loan_host_rate_cv_30,
            credit_host_rate_avg_30, credit_host_rate_sd_30, credit_host_rate_cv_30,
            fre_ins_host_avg_30, fre_ins_host_sd_30, fre_ins_host_cv_30,
            fre_loan_host_avg_30, fre_loan_host_sd_30, fre_loan_host_cv_30,
            fre_credit_host_avg_30, fre_credit_host_sd_30, fre_credit_host_cv_30,
            fre_other_host_avg_30, fre_other_host_sd_30, fre_other_host_cv_30,
            ins_fre_rate_avg_30, ins_fre_rate_sd_30, ins_fre_rate_cv_30,
            loan_fre_rate_avg_30, loan_fre_rate_sd_30, loan_fre_rate_cv_30,
            credit_fre_rate_avg_30, credit_fre_rate_sd_30, credit_fre_rate_cv_30,
            sent_30, sent_90, sent_180, days_since_sent_date,
            click_30, click_90, click_180, days_since_click_date,
            myl_rank, price, age, gender, maker, brand, new_age,
            rank.city_code, province_code, city_level 

        FROM people_for_prediction all
        LEFT JOIN (
                SELECT *
                FROM bigdata_insurance.{self.n_host}
                WHERE update_date >= date_format(date_add(current_date(), -{self.n_dpi_days}), 'yyyyMMdd')
                AND update_date < date_format(date_add(current_date(), -0), 'yyyyMMdd')
                ) n_host on all.rysecret = n_host.rysecret
        LEFT JOIN (
                SELECT 
                    rysecret, update_date, sent_30, sent_90, sent_180, datediff(update_dt, sent_date) days_since_sent_date,
                    click_30, click_90, click_180, datediff(update_dt, click_date) days_since_click_date
                FROM (
                    SELECT *, from_unixtime(unix_timestamp(cast(update_date as string), 'yyyyMMdd'), 'yyyy-MM-dd') update_dt
                    FROM bigdata_insurance.{self.n_sent}
                    ) sent    
                WHERE update_date >= date_format(date_add(current_date(), -{self.n_dpi_days}), 'yyyyMMdd')
                    AND update_date < date_format(date_add(current_date(), -0), 'yyyyMMdd')
                ) n_sent on all.rysecret = n_sent.rysecret
        LEFT JOIN (
                SELECT rysecret, age, gender, price, maker
                FROM bigdata_insurance.e_static_feature_table
                ) st on all.rysecret = st.rysecret
        LEFT JOIN (
                SELECT rysecret, brand
                FROM bigdata_insurance.e_brand_table
                ) bt on all.rysecret = bt.rysecret
        LEFT JOIN (
                SELECT rysecret, first(cast(rank as int)) myl_rank
                FROM (
                    SELECT uid rysecret, rank, p_date, max(p_date) over (partition by uid) max_date
                    FROM sample.e_mayilian) sample
                WHERE p_date = max_date
                GROUP BY rysecret
                ) myl on all.rysecret = myl.rysecret
        LEFT JOIN (
                SELECT uid rysecret, first(age) new_age
                FROM model_dig.e_user_age_col
                WHERE age is not null
                GROUP BY rysecret
                ) age on all.rysecret = age.rysecret
        JOIN (
                SELECT uid rysecret, citycode city_code
                FROM dw_resources.mapping_uid_property
                ) city on all.rysecret = city.rysecret
        JOIN (
                SELECT city_id city_code, province_id province_code, city_level_id city_level 
                FROM bigdata_insurance.e_citycode_rank_dict
                ) rank on city.city_code = rank.city_code
        """
        ).cache()
        dpi_data = spark.sql(
            f"""
        SELECT 
            all.rysecret, n_visited.host,
            n_10, n_20, n_30, n_avg_30, n_cv_30,
            fre_10, fre_20, fre_30, fre_avg_30, fre_cv_30
        FROM people_for_prediction all
        JOIN (
            SELECT *
            FROM bigdata_insurance.{self.n_visited}
            WHERE update_date >= date_format(date_add(current_date(), -{self.n_dpi_days}), 'yyyyMMdd')
                AND update_date < date_format(date_add(current_date(), -0), 'yyyyMMdd') 
                AND host IN {self.hosts}
            ) n_visited ON all.rysecret = n_visited.rysecret 
        """
        ).cache()

        click_product_data = spark.sql(
            f"""
        SELECT 
            all.rysecret, n_sent_bt.product,
            click_30, click_90, click_180, click_360, fre_click_30, fre_click_90, fre_click_180, fre_click_360, days_since_click_date,
            e_ip_30, e_ip_90, e_ip_180, e_ip_360, fre_e_ip_30, fre_e_ip_90, fre_e_ip_180, fre_e_ip_360, days_since_e_ip_date
        FROM people_for_prediction all
        JOIN (
            SELECT *
            FROM bigdata_insurance.{self.n_sent_bt}
            WHERE update_date >= date_format(date_add(current_date(), -{self.n_dpi_days}), 'yyyyMMdd')
                AND update_date < date_format(date_add(current_date(), -0), 'yyyyMMdd')
                AND product IN {self.products_click} 
            ) n_sent_bt ON all.rysecret = n_sent_bt.rysecret
        """
        ).cache()

        sent_product_data = spark.sql(
            f"""
        SELECT 
            all.rysecret,  n_sent_bt.product,
            sent_30, sent_90, sent_180, sent_360, days_since_sent_date,
            call_30, call_90, call_180, call_360, days_since_call_date,
            pick_30, pick_90, pick_180, pick_360, days_since_pick_date
        FROM people_for_prediction all
        JOIN (
            SELECT *
            FROM bigdata_insurance.{self.n_sent_bt}
            WHERE update_date >= date_format(date_add(current_date(), -{self.n_dpi_days}), 'yyyyMMdd')
                AND update_date < date_format(date_add(current_date(), -0), 'yyyyMMdd') 
                AND product IN {self.product_sent}
            ) n_sent_bt ON all.rysecret = n_sent_bt.rysecret 
        """
        ).cache()

        label_data = (
            label_data.fillna(
                {
                    "maker": "others",
                    "brand": "others",
                    "price": -999,
                    "age": 0,
                    "gender": 2,
                    "new_age": "null",
                    "myl_rank": -999,
                }
            )
            .fillna(
                -999,
                subset=[
                    ele
                    for ele in label_data.columns
                    if ("_cv_" in ele) or ("days_since_" in ele)
                ],
            )
            .fillna(0)
        )

        dummy_dpi_data = self.get_dummy_data(
            dpi_data, self.df_message, ["host"], ["hosts"]
        )
        dpi_data = (
            spark.createDataFrame(dummy_dpi_data)
            .union(dpi_data)
            .filter(F.col("rysecret") != "A")
            .groupBy(["rysecret"])
            .pivot("host")
            .agg(
                *(
                    F.first(i).alias(i)
                    for i in dpi_data.columns
                    if i not in ["rysecret", "host"]
                )
            )
            .fillna(0)
        )
        print("dpi_data")

        dummy_click_product_data = self.get_dummy_data(
            click_product_data, self.df_message, ["product"], ["products_click"]
        )
        click_product_data = (
            spark.createDataFrame(dummy_click_product_data)
            .union(click_product_data)
            .filter(F.col("rysecret") != "A")
            .groupBy(["rysecret"])
            .pivot("product")
            .agg(
                *(
                    F.first(i).alias(i)
                    for i in click_product_data.columns
                    if i not in ["rysecret", "product"]
                )
            )
            .fillna(0)
        )
        print("click_data")

        dummy_sent_product_data = self.get_dummy_data(
            sent_product_data, self.df_message, ["product"], ["products_send"]
        )
        sent_product_data = (
            spark.createDataFrame(dummy_sent_product_data)
            .union(sent_product_data)
            .filter(F.col("rysecret") != "A")
            .groupBy(["rysecret"])
            .pivot("product")
            .agg(
                *(
                    F.first(i).alias(i)
                    for i in sent_product_data.columns
                    if i not in ["rysecret", "product"]
                )
            )
            .fillna(0)
        )
        print("sent_data")

        all_data = (
            reduce(
                lambda df1, df2: df1.join(df2, on=["rysecret"], how="left"),
                [label_data, dpi_data, click_product_data, sent_product_data],
            )
            .withColumn("sent_date", F.lit("20750101"))
            .withColumn("label", F.lit(0))
        )
        print("all_data")
        for col in [i for i in self.fake_data.columns if i not in all_data.columns]:
            all_data = all_data.withColumn(col, F.lit(0))

        all_data = (
            all_data.select(self.fake_data.columns)
            .fillna(-999)
            .dropDuplicates(subset=["rysecret"])
        )
        return all_data

    def data_deal_for_train(self, all_data):
        piped_all_data = self.pipeline_model.transform(all_data)

        data_pred = self.model.transform(piped_all_data)

        data_pred = data_pred.select("rysecret", "probability", "prediction")

        data_pred.cache()
        vec_to_columns = F.udf(lambda x: x[1].item(), DoubleType())
        data_pred1 = data_pred.withColumn("p_prob", vec_to_columns("probability"))

        operator_id2 = 8 if self.operator == "cmcc" else 9

        rule_name_ = "_".join("cmcc_jdjk_click".split("_")[1:])
        rule_name = f"{self.operator}_dpi_{operator_id2}_dym_{rule_name_}_zhy"
        print(rule_name)

        return data_pred1, rule_name

    def get_threshold_count(self, test_pre, prob_col="p_prob"):
        temp = []
        for threshhold in np.arange(0, 1, 0.1):
            num_p = test_pre.filter(F.col(prob_col) >= threshhold).count()
            temp.append((threshhold.round(2), num_p))
        df = pd.DataFrame(temp, columns=["prob", "count"])
        n_max = df[df["count"] >= 10000].prob.tolist()[-1]
        print(n_max)
        return n_max

    def save_data(self, data_pred, n_max, rule_name):
        vec_to_columns = F.udf(lambda x:x[1].item(), DoubleType()) 
        (
            data_pred.withColumn("p_prob", vec_to_columns("probability"))
            .filter(F.col("p_prob") >= n_max)
            .select("rysecret")
            .createOrReplaceTempView("shot")
        )
        spark.sql(
            """
        set hive.exec.dynamic.partition.mode=nonstrict
        """
        )
        spark.sql(
            f"""
        INSERT OVERWRITE table bigdata_insurance.e_zhuyan_bx_rules_pre partition(rule_name) 
        SELECT DISTINCT a.rysecret, b.citycode, b.operator,  b.suffix, b.province, 
                date_add(current_date(), 1) AS p_date, '{rule_name}' AS rule_name
        FROM shot AS a 
        JOIN dw_resources.mapping_uid_property AS b ON a.rysecret = b.uid 
        LEFT JOIN (select rysecret 
                    from bigdata_insurance.e_bt_black_list_bx 
                    where level = 'jdjk_waihu') AS c ON a.rysecret = c.rysecret 
        WHERE c.rysecret is NULL AND b.operator = {self.operator_id}
        """
        )

    def generate(self):
        self.get_people_for_pred()
        all_data = self.deal_with_data()
        data_pred, rule_name = self.data_deal_for_train(all_data)
        n_max = self.get_threshold_count(data_pred)
        self.save_data(data_pred, n_max, rule_name)  

In [20]:
md1 = model_prediction("cmcc_jdjk_click", "gbt")

In [21]:
md1.generate() 

dpi_data
click_data
sent_data
all_data
cmcc_dpi_8_dym_jdjk_click_zhy
0.2
