## Analyze and Preprocess the raw_sample Dataset

From the analysis, we conclude that the prediction target is whether an advertisement is clicked (**clk** vs. **nonclk**).  
Among the available fields, only **ad placement (pid)** serves as a useful feature for predicting ad clicks.  

We then divide the dataset into **training set (first 7 days)** and **test set (the 8th day)** based on the timestamp.

In [1]:
import os
# 配置pyspark和spark driver运行时 使用的python解释器
JAVA_HOME = '/root/bigdata/jdk'
PYSPARK_PYTHON = '/miniconda2/envs/py365/bin/python'
# 当存在多个版本时，不指定很可能会导致出错
os.environ['PYSPARK_PYTHON'] = PYSPARK_PYTHON
os.environ['PYSPARK_DRIVER_PYTHON'] = PYSPARK_PYTHON
os.environ['JAVA_HOME'] = JAVA_HOME
# 配置spark信息
from pyspark import SparkConf
from pyspark.sql import SparkSession

SPARK_APP_NAME = 'preprocessingRawSample'
SPARK_URL = 'spark://192.168.58.100:7077'

conf = SparkConf()    # 创建spark config对象
config = (
    ("spark.app.name", SPARK_APP_NAME),    # 设置启动的spark的app名称，没有提供，将随机产生一个名称
    ("spark.executor.memory", "2g"),    # 设置该app启动时占用的内存用量，默认1g
    ("spark.master", SPARK_URL),    # spark master的地址
    ("spark.executor.cores", "2"),    # 设置spark executor使用的CPU核心数
    # 以下三项配置，可以控制执行器数量
    # ("spark.dynamicAllocation.enabled", True),
    # ("spark.dynamicAllocation.initialExecutors", 1),    # 1个执行器
    # ("spark.shuffle.service.enabled", True)
    # ('spark.sql.pivotMaxValues', '99999'),  # 当需要pivot DF，且值很多时，需要修改，默认是10000
)
# 查看更详细配置及说明：https://spark.apache.org/docs/latest/configuration.html

conf.setAll(config)

# 利用config对象，创建spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
df = spark.read.csv("/data/raw_sample.csv", header = True)
df.show()
df.printSchema()

+------+----------+----------+-----------+------+---+
|  user|time_stamp|adgroup_id|        pid|nonclk|clk|
+------+----------+----------+-----------+------+---+
|581738|1494137644|         1|430548_1007|     1|  0|
|449818|1494638778|         3|430548_1007|     1|  0|
|914836|1494650879|         4|430548_1007|     1|  0|
|914836|1494651029|         5|430548_1007|     1|  0|
|399907|1494302958|         8|430548_1007|     1|  0|
|628137|1494524935|         9|430548_1007|     1|  0|
|298139|1494462593|         9|430539_1007|     1|  0|
|775475|1494561036|         9|430548_1007|     1|  0|
|555266|1494307136|        11|430539_1007|     1|  0|
|117840|1494036743|        11|430548_1007|     1|  0|
|739815|1494115387|        11|430539_1007|     1|  0|
|623911|1494625301|        11|430548_1007|     1|  0|
|623911|1494451608|        11|430548_1007|     1|  0|
|421590|1494034144|        11|430548_1007|     1|  0|
|976358|1494156949|        13|430548_1007|     1|  0|
|286630|1494218579|        1

#### Analyze Field Types and Formats in the Dataset  
1. Check for missing values  
2. Check the data type of each column  
3. Check the category distribution of each column  

In [6]:
print("样本数据集总条目数：", df.count())
# 约2600w
print("用户user总数：", df.groupBy("user").count().count())
# 约 114w，略多余日志数据中用户数
print("广告id adgroup_id总数：", df.groupBy("adgroup_id").count().count())
# 约85w
print("广告展示位pid情况：", df.groupBy("pid").count().collect())
# 只有两种广告展示位，占比约为六比四
print("广告点击数据情况clk：", df.groupBy("clk").count().collect())
# 点和不点比率约： 1:20

样本数据集总条目数： 26557961
用户user总数： 1141729
广告id adgroup_id总数： 846811
广告展示位pid情况： [Row(pid='430548_1007', count=16472898), Row(pid='430539_1007', count=10085063)]
广告点击数据情况clk： [Row(clk='0', count=25191905), Row(clk='1', count=1366056)]


#### Use `dataframe.withColumn` to Modify Column Data Types; Use `dataframe.withColumnRenamed` to Rename Columns

In [4]:
# 更改表结构，转换为对应的数据类型
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, LongType, StringType

# 打印df结构信息
df.printSchema()   
# 更改df表结构：更改列类型和列名称
raw_sample_df = df.\
    withColumn("user", df.user.cast(IntegerType())).withColumnRenamed("user", "userId").\
    withColumn("time_stamp", df.time_stamp.cast(LongType())).withColumnRenamed("time_stamp", "timestamp").\
    withColumn("adgroup_id", df.adgroup_id.cast(IntegerType())).withColumnRenamed("adgroup_id", "adgroupId").\
    withColumn("pid", df.pid.cast(StringType())).\
    withColumn("nonclk", df.nonclk.cast(IntegerType())).\
    withColumn("clk", df.clk.cast(IntegerType()))
raw_sample_df.printSchema()
raw_sample_df.show()

root
 |-- user: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- adgroup_id: string (nullable = true)
 |-- pid: string (nullable = true)
 |-- nonclk: string (nullable = true)
 |-- clk: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- adgroupId: integer (nullable = true)
 |-- pid: string (nullable = true)
 |-- nonclk: integer (nullable = true)
 |-- clk: integer (nullable = true)

+------+----------+---------+-----------+------+---+
|userId| timestamp|adgroupId|        pid|nonclk|clk|
+------+----------+---------+-----------+------+---+
|581738|1494137644|        1|430548_1007|     1|  0|
|449818|1494638778|        3|430548_1007|     1|  0|
|914836|1494650879|        4|430548_1007|     1|  0|
|914836|1494651029|        5|430548_1007|     1|  0|
|399907|1494302958|        8|430548_1007|     1|  0|
|628137|1494524935|        9|430548_1007|     1|  0|
|298139|1494462593|        9|430539_1007|     1|  0|


### Feature Selection  

Feature selection means choosing reliable features and removing redundant ones.  
- For **search ads**, the matching degree between the query keywords and the ad is very important.  
- For **display ads**, the historical performance of the ad itself is usually the most important feature.  

Based on experience, in this dataset only the **ad placement (pid)** is relatively important, with a distribution ratio of about 6:4 across different values. Therefore, `pid` can be used as a key feature.  

`nonclk` and `clk` serve as the target labels here and are **not** used as features.

### One-Hot Encoding  

One-hot encoding is a classic encoding method that uses N binary bits (0/1) to represent N possible states. Each state has its own dedicated bit, and at any time only one bit is active.  

Suppose we have three feature groups: age, city, and device:  

- ["Male", "Female"] → [0,1]  
- ["Beijing", "Shanghai", "Guangzhou"] → [0,1,2]  
- ["Apple", "Xiaomi", "Huawei", "Microsoft"] → [0,1,2,3]  

**Traditional mapping (enumeration):**  
Each feature group is assigned values starting from 0.  
- ["Male", "Shanghai", "Xiaomi"] → [0,1,1]  
- ["Female", "Beijing", "Apple"] → [1,0,0]  

This encoding is not continuous and is essentially arbitrary, making it less suitable for classifiers.  

**After one-hot encoding:**  
The data becomes sparse but more suitable for classifiers:  
- ["Male", "Shanghai", "Xiaomi"] → [1,0,0,1,0,0,1,0,0]  
- ["Female", "Beijing", "Apple"] → [0,1,1,0,0,1,0,0,0]  

**This approach preserves the diversity of features, but note:** if the data becomes too sparse (few samples and very high dimensions), performance may actually degrade.  

#### One-Hot Encoding in Spark  

Note: One-hot encoding can only be applied to string-type columns.  

- [StringIndexer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=stringindexer#pyspark.ml.feature.StringIndexer): Processes specified string columns, e.g., converting gender values "Male" and "Female" into 0 and 1.  
- [OneHotEncoder](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=onehotencoder#pyspark.ml.feature.OneHotEncoder): Performs one-hot encoding on feature columns; usually used together with `StringIndexer`.  
- [Pipeline](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=pipeline#pyspark.ml.Pipeline): Ensures sequential processing of data, passing the result of one step as the input to the next.  

In [5]:
'''特征处理'''
'''
pid 资源位。该特征属于分类特征，只有两类取值，因此考虑进行热编码处理即可，分为是否在资源位1、是否在资源位2 两个特征
'''
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# StringIndexer对指定字符串 列数据进行特征处理，如将性别数据“男”、“女”转化为0和1
stringindexer = StringIndexer(inputCol='pid',outputCol='pid_feature')
# OneHotEncoder对特征列数据，进行热编码，通常需结合StringIndexer一起使用
# dropLast=False 两个pid特征，onehot编码是(2,[0],[1.0])，即[0,1]和[1,0]
# dropLast=Ture 两个pid特征，onehot编码是(1,[0],[1.0])，即[1]和[0]
encoder = OneHotEncoder(dropLast = False, inputCol='pid_feature', outputCol='pid_value')
pipline = Pipeline(stages=[stringindexer,encoder])
pipline_model = pipline.fit(raw_sample_df)
new_df = pipline_model.transform(raw_sample_df)# 返回pid_value是稀疏向量
new_df

DataFrame[userId: int, timestamp: bigint, adgroupId: int, pid: string, nonclk: int, clk: int, pid_feature: double, pid_value: vector]

In [6]:
new_df.show()

+------+----------+---------+-----------+------+---+-----------+-------------+
|userId| timestamp|adgroupId|        pid|nonclk|clk|pid_feature|    pid_value|
+------+----------+---------+-----------+------+---+-----------+-------------+
|581738|1494137644|        1|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|449818|1494638778|        3|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|914836|1494650879|        4|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|914836|1494651029|        5|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|399907|1494302958|        8|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|628137|1494524935|        9|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|298139|1494462593|        9|430539_1007|     1|  0|        1.0|(2,[1],[1.0])|
|775475|1494561036|        9|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|555266|1494307136|       11|430539_1007|     1|  0|        1.0|(2,[1],[1.0])|
|117840|1494036743|       11|430548_1007|     1|  0|

#### The Returned Field `pid_value` Is a Sparse Vector Type

#### [pyspark.ml.linalg.SparseVector](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=sparse#pyspark.ml.linalg.SparseVector)

In [None]:
'''=====延申学习：spark中的稀疏向量=====
from pyspark.ml.linalg import SparseVector
print(SparseVector(4,[1,3],[3.0,4.0]))
print(SparseVector(4,[2,3],[3.0,4.0]).toArray())
print('*********')
print(new_df.select('pid_value').first())
print(new_df.select('pid_value').first().pid_value.toArray())
'''
'''
(4,[1,3],[3.0,4.0])
[0. 0. 3. 4.]
*********
Row(pid_value=SparseVector(2, {0: 1.0}))
[1. 0.]
'''

#### Split the Dataset into Training and Test Sets

In [8]:
new_df.sort("timestamp", ascending=False).show()

+------+----------+---------+-----------+------+---+-----------+-------------+
|userId| timestamp|adgroupId|        pid|nonclk|clk|pid_feature|    pid_value|
+------+----------+---------+-----------+------+---+-----------+-------------+
|243671|1494691186|   600195|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|177002|1494691186|   593001|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|488527|1494691184|   687854|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
| 17054|1494691184|   742741|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|488527|1494691184|   431082|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
| 17054|1494691184|   756665|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|488527|1494691184|   494312|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|839493|1494691183|   582235|430548_1007|     1|  0|        0.0|(2,[0],[1.0])|
|704223|1494691183|   624504|430539_1007|     1|  0|        1.0|(2,[1],[1.0])|
|839493|1494691183|   561681|430548_1007|     1|  0|

In [9]:
# 本样本数据集共计8天数据
# 前七天为训练数据、最后一天为测试数据

from datetime import datetime
datetime.fromtimestamp(1494691186)
print("该时间之前的数据为训练样本，该时间以后的数据为测试样本：", datetime.fromtimestamp(1494691186-24*60*60))

该时间之前的数据为训练样本，该时间以后的数据为测试样本： 2017-05-12 23:59:46


In [10]:
# `where` is an alias for :func:`filter`.
# 训练样本：
train_sample = raw_sample_df.where(raw_sample_df.timestamp<=(1494691186-24*60*60))
print("训练样本个数：")
print(train_sample.count())
# 测试样本
test_sample = raw_sample_df.filter(raw_sample_df.timestamp>(1494691186-24*60*60))
print("测试样本个数：")
print(test_sample.count())

# 注意：还需要加入广告基本特征和用户基本特征才能做程一份完整的样本数据集

训练样本个数：
23249291
测试样本个数：
3308670


In [None]:
'''
1. user_id: anonymized user ID
2. adgroup_id: anonymized ad unit ID
3. time_stamp: timestamp
4. pid: ad placement ID
5. noclk: 1 = not clicked; 0 = clicked
6. clk: 0 = not clicked; 1 = clicked
'''
# Only the ad placement (pid) is useful for predicting whether an ad is clicked.
# Note: To build a complete sample dataset, we still need to include 
#       ad basic features and user basic features.