In [1]:
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径，避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))

PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时，不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from offline import SparkSessionBase

class CtrLogisticRegression(SparkSessionBase):

    SPARK_APP_NAME = "ctrLogisticRegression"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):

        self.spark = self._create_spark_hbase()

ctr = CtrLogisticRegression()

In [2]:
# 2、读取用户点击行为表，与用户画像和文章画像，构造训练样本
ctr.spark.sql('use profile')
news_article_basic = ctr.spark.sql("select user_id, article_id, channel_id, clicked from user_article_basic")

In [None]:
news_article_basic.show()

In [2]:
ctr.spark.sql('use profile')

# 获取用户画像的数据
user_profile_hbase = ctr.spark.sql(
    "select user_id, information.birthday, information.gender, article_partial, env from user_profile_hbase limit 10")
#user_profile_hbase = user_profile_hbase.drop('env','birthday', 'gender')

In [3]:
ctr.spark.sql('show tables').show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| profile|ctr_feature_artic...|      false|
| profile|ctr_feature_user_...|      false|
| profile|         user_action|      false|
| profile|  user_article_basic|      false|
| profile|  user_profile_hbase|      false|
+--------+--------------------+-----------+



In [4]:
ctr.spark.sql('select * from user_profile_hbase limit 10')

DataFrame[user_id: string, information: map<string,double>, article_partial: map<string,double>, env: map<string,int>]

In [None]:
ctr.spark.sql('select * from user_profile_hbase limit 10').show()

In [3]:
user_profile_hbase.show()

KeyboardInterrupt: 

In [15]:
# 对用户ID做处理
def get_user_id(row):
    return int(row.user_id.split(':')[1]), row.birthday, row.gender, row.article_partial

user_profile_hbase = user_profile_hbase.rdd.map(get_user_id)

In [16]:
# 存入hbase

from pyspark.sql.types import *

_schema = StructType([
    StructField("user_id", LongType()),
    StructField("weights", MapType(StringType(), DoubleType()))
])

user_profile_hbase_schema = ctr.spark.createDataFrame(user_profile_hbase, schema=_schema)

def frature_preprocess(row):

    from pyspark.ml.linalg import Vectors

    channel_weights = []
    for i in range(1, 26):
        try:
            _res = sorted([row.weights[key] for key
                           in row.weights.keys() if key.split(':')[0] == str(i)])[:10]
            channel_weights.append(_res)
        except:
            channel_weights.append([0.0] * 10)

    return row.user_id, channel_weights

res = user_profile_hbase_schema.rdd.map(frature_preprocess).collect()

KeyboardInterrupt: 

In [None]:
import happybase
# 批量插入Hbase数据库中
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
with pool.connection() as conn:
    ctr_feature = conn.table('ctr_feature_user')
    with ctr_feature.batch(transaction=True) as b:
        for i in range(len(res)):
            for j in range(25):
                b.put("{}".format(res[i][0]).encode(),{"channel:{}".format(j+1).encode(): str(res[i][1][j]).encode()})
    conn.close()

In [None]:
# 对于其中toDF存在一些列没办法确定类型，手动指定DataFrame列的类型
_schema = StructType([
    StructField('user_id', LongType()),
    StructField('birthday', DoubleType()),
    StructField('gender', BooleanType()),
    StructField('article_partial', MapType(StringType(), DoubleType()))
])

user_profile_hbase = ctr.spark.createDataFrame(user_profile_hbase, schema=_schema)

In [None]:
user_profile_hbase.show()

In [None]:
# 合并用户点击行为表与用户画像表，并进行相应的删除无用特征
train = news_article_basic.join(user_profile_hbase, on=['user_id'], how='left').drop('birthday').drop('channel_id').drop('gender')



In [None]:
train.show()

In [None]:
# 合并文章的向量以及文章的权重特征，文章所属的真正频道ID
ctr.spark.sql('use article')
article_vector = ctr.spark.sql("select * from article_vector")

In [None]:
train_user_article = train.join(article_vector, on=['article_id'], how='left')

In [None]:
train_user_article.show()

In [None]:
# 读取文章画像
article_profile = ctr.spark.sql("select article_id, keywords from article_profile")

def get_article_weights(row):
    
    try:
        weights = sorted(row.keywords.values())[:10]
    except Exception as e:
        weights = [0.0] * 10
    
    return row.article_id, weights

article_profile = article_profile.rdd.map(get_article_weights).toDF(['article_id', 'article_weights'])

In [None]:
# 合并文章权重与样本
train_user_article = train_user_article.join(article_profile, on=['article_id'], how='left')

In [None]:
train_user_article.show()

In [None]:
# 保留了用户的每个频道的关键词权重，找到用户对应操作文章的所属频道的关键词权重
train_user_article = train_user_article.dropna()

In [None]:
train_user_article.show()

In [None]:
train_user_article

In [None]:
columns = ['article_id', 'user_id', 'channel_id', 'articlevector', 'user_weights', 'article_weights', 'clicked']
def get_user_weights(row):

    from pyspark.ml.linalg import Vectors
    try:
        user_weights = sorted([row.article_partial[key] for key in row.article_partial.keys() if key.split(':')[0] == str(row.channel_id)])[
                  :10]
    except Exception:
        user_weights = [0.0] * 10

    return row.article_id, row.user_id, row.channel_id, Vectors.dense(row.articlevector), Vectors.dense(
        user_weights), Vectors.dense(row.article_weights), int(row.clicked)

train_vector = train_user_article.rdd.map(get_user_weights).toDF(columns)


In [None]:
# 收集所有特征到一个features列
train_res = VectorAssembler().setInputCols(columns[2:6]).setOutputCol('features').transform(train_vector)

In [None]:
train_res.show()

In [None]:
# 处理要写入的训练样本格式
train = train_res.select(['article_id', 'user_id', 'clicked', 'features'])

In [None]:
arr = train.collect()

In [None]:
arr

In [None]:
#lr = LogisticRegression()
#model = lr.setLabelCol("clicked").setFeaturesCol("features").fit(train)
#model.save("hdfs://hadoop-master:9000/headlines/models/logistic_ctr_model.obj")

In [None]:
#model.save("/headlines/models/logistic_ctr_model.obj")

In [None]:
#model

In [None]:
#online_model = LogisticRegressionModel.load("hdfs://hadoop-master:9000/headlines/models/logistic_ctr_model.obj")



In [None]:
#res_transfrom = online_model.transform(test)

#res_transfrom.select(["clicked", "probability", "prediction"]).show()

In [None]:
def vector_to_double(row):
    return float(row.clicked), float(row.probability[1]) 

score_label = res_transfrom.select(["clicked", "probability"]).rdd.map(vector_to_double)

In [None]:
score_label

In [None]:
model.summary.roc.show()

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(model.summary.roc.select('FPR').collect(),
         model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')


In [None]:
plt.show()

In [None]:
# 处理DataFrame pandas
import pandas as pd
df = pd.DataFrame(arr)

In [None]:
df

In [None]:
import tensorflow as tf

def write_to_tfrecords(click_batch, feature_batch):
    """将用户与文章的点击日志构造的样本写入TFRecords文件
    """
    
    # 1、构造tfrecords的存储实例
    writer = tf.python_io.TFRecordWriter("./train_ctr_20200329.tfrecords")
    
    # 2、循环将所有样本一个个封装成example，写入这个文件
    for i in range(len(click_batch)):
        # 取出第i个样本的特征值和目标值，格式转换
        click = click_batch[i]
        feature = feature_batch[i].tostring()
        # [18.0, 0.09475817797242475, 0.0543921297305341...
        
        # 构造example，int64, float64, bytes
        example = tf.train.Example(features=tf.train.Features(feature={
            "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[click])),
            "features": tf.train.Feature(bytes_list=tf.train.BytesList(value=[feature]))
        }))
        
        # 序列化example,写入文件
        writer.write(example.SerializeToString())
    
    writer.close()

# 开启会话打印内容
with tf.Session() as sess:
    # 创建线程协调器
    coord = tf.train.Coordinator()

    # 开启子线程去读取数据
    # 返回子线程实例
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    # 存入数据
    write_to_tfrecords(df.iloc[:, 2], df.iloc[:, 3])

    # 关闭子线程，回收
    coord.request_stop()

    coord.join(threads)