In [1]:
import json
import numpy as np

from pyspark.sql import SQLContext, HiveContext, SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import Word2VecModel
from pyspark.sql import Row

# import traceback
# from src.read_files import ReadFiles
# from src.tools_fuction import Tools
from pyspark.sql import functions as sf

appname = "similar14i34yu"  # 任务名称
# master = "spark://XXX.XXX.XX.XX:XXXX"  # "spark://host:port"
master = "yarn"  # "spark://host:port"
'''
standalone模式:spark://host:port,Spark会自己负责资源的管理调度
mesos模式:mesos://host:port
yarn模式:由于很多时候我们需要和mapreduce使用同一个集群，所以都采用Yarn来管理资源调度，这也是生产环境大多采用yarn模式的原因。yarn模式又分为yarn cluster模式和yarn client模式：
yarn cluster: 这个就是生产环境常用的模式，所有的资源调度和计算都在集群环境上运行。
yarn client: 这个是说Spark Driver和ApplicationMaster进程均在本机运行，而计算任务在cluster上。
'''
spark_driver_host = "10.126.107.75"  # 本地主机ip

conf = SparkConf().setAppName(appname).setMaster(master).set("spark.driver.host", spark_driver_host)



In [2]:
class SparkDriveThings():
    def __init__(self, conf):
        self.spark_session = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
        self.sc = self.spark_session.sparkContext
        self.sqlContext = SQLContext(self.sc)
        self.userDataDf = self.sqlContext.read.format("com.databricks.spark.csv").\
            options(header="true",inferschema="true").\
            load("/home/hdp_lbg_ectech/resultdata/strategy/ads/linJQ_test/tencent_ad_compete/train_data/user.csv")

        self.adDataDf = self.sqlContext.read.format("com.databricks.spark.csv").\
            options(header="true",inferschema="true").\
            load("/home/hdp_lbg_ectech/resultdata/strategy/ads/linJQ_test/tencent_ad_compete/train_data/ad.csv")
        
        self.clkDataDf = self.sqlContext.read.format("com.databricks.spark.csv").\
            options(header="true",inferschema="true").\
            load("/home/hdp_lbg_ectech/resultdata/strategy/ads/linJQ_test/tencent_ad_compete/train_data/click_log.csv")


In [3]:
sdt = SparkDriveThings(conf)
sdt.sc._jsc.sc().applicationId()

'application_1591776234013_2292347'

In [13]:

adData = sdt.clkDataDf.show(5)

+----+-------+-----------+-----------+
|time|user_id|creative_id|click_times|
+----+-------+-----------+-----------+
|   9|  30920|     567330|          1|
|  65|  30920|    3072255|          1|
|  56|  30920|    2361327|          1|
|   6| 309204|     325532|          1|
|  59| 309204|    2746730|          1|
+----+-------+-----------+-----------+
only showing top 5 rows



In [4]:
clk_user_infoDf = sdt.clkDataDf.na.drop()\
    .join(sdt.userDataDf.na.drop(),'user_id',how='left')

In [5]:
clk_user_ad_infoDf = clk_user_infoDf\
    .join(sdt.adDataDf.na.drop(),'creative_id',how='left')

In [6]:
clk_user_ad_infoConcatDf = clk_user_ad_infoDf\
    .withColumn(
    'clk_info',
    sf.concat_ws(',','product_id', 'product_category', 'industry','click_times')
)

In [None]:
clk_user_ad_infoConcatDf.show(5)

In [11]:
res = clk_user_ad_infoConcatDf.groupBy('user_id','gender','age').agg(
    sf.collect_list('clk_info')
).withColumn('productId_productCategory_industry_clickTimes',sf.concat_ws('_','collect_list(clk_info)'))\
    .select('user_id','gender','age','productId_productCategory_industry_clickTimes')


In [12]:
res.show(5)

+-------+------+---+---------------------------------------------+
|user_id|gender|age|productId_productCategory_industry_clickTimes|
+-------+------+---+---------------------------------------------+
|      1|     1|  4|                         \N,5,106,1_\N,5,7...|
|      4|     1|  5|                         39422,17,\N,1_206...|
|     13|     2|  6|                         1305,2,319,1_1862...|
|     20|     1|  4|                         129,2,6,1_1036,2,...|
|     21|     2|  6|                         \N,18,24,1_39519,...|
+-------+------+---+---------------------------------------------+
only showing top 5 rows



In [13]:
res.write.mode("overwrite").options(header="true").csv("/home/hdp_lbg_ectech/resultdata/strategy/ads/linJQ_test/tencent_ad_compete/clk_user_ad_info",sep="\t")
