# 任务1：将原始数据存入数据仓库的ODS层

### 开始前通过start-all.cmd启动Hadoop集群，通过hive.cmd打开Hive

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [2]:
spark.sql('drop database if exists ODS cascade;')

DataFrame[]

In [3]:
#创建ODS数据层
spark.sql('create database if not exists ODS;')
spark.sql('show databases;').show()

+---------+
|namespace|
+---------+
|  default|
|      dwd|
|      dws|
|      ods|
+---------+



In [4]:
media_index = spark.read.format('csv').option('inferSchema','true').option('header','true').option('sep',';').load('D:\DataMiningProject\Project-01\data\media_index.csv')
mediamatch_userevent = spark.read.format('csv').option('inferSchema','true').option('header','true').option('sep',';').load('D:\DataMiningProject\Project-01\data\mediamatch_userevent.csv')
mediamatch_usermsg = spark.read.format('csv').option('inferSchema','true').option('header','true').option('sep',';').load('D:\DataMiningProject\Project-01\data\mediamatch_usermsg.csv')
mmconsume_billevents = spark.read.format('csv').option('inferSchema','true').option('header','true').option('sep',';').load('D:\DataMiningProject\Project-01\data\mmconsume_billevents.csv')
order_index = spark.read.format('csv').option('inferSchema','true').option('header','true').option('sep',';').load('D:\DataMiningProject\Project-01\data\order_index.csv')
media_index

DataFrame[terminal_no: bigint, phone_no: int, duration: int, station_name: string, origin_time: string, end_time: string, owner_code: string, owner_name: string, vod_cat_tags: string, resolution: string, audio_lang: string, region: string, res_name: string, res_type: int, vod_title: string, category_name: string, program_title: string, sm_name: string]

In [5]:
media_index.write.mode('overwrite').saveAsTable('ODS.media_index')
mediamatch_userevent.write.mode('overwrite').saveAsTable('ODS.mediamatch_userevent')
mediamatch_usermsg.write.mode('overwrite').saveAsTable('ODS.mediamatch_usermsg')
mmconsume_billevents.write.mode('overwrite').saveAsTable('ODS.mmconsume_billevents')
order_index.write.mode('overwrite').saveAsTable('ODS.order_index')

In [6]:
#使用sql脚本在database内创建table
spark.sql('use ODS;')
spark.sql('show tables;').show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|      ods|         media_index|      false|
|      ods|mediamatch_userevent|      false|
|      ods|  mediamatch_usermsg|      false|
|      ods|mmconsume_billevents|      false|
|      ods|         order_index|      false|
+---------+--------------------+-----------+



In [7]:
#查看各table数据总量
spark.sql('select count(*) from media_index;').show()
spark.sql('select count(*) from mediamatch_userevent;').show()
spark.sql('select count(*) from mediamatch_usermsg;').show()
spark.sql('select count(*) from mmconsume_billevents;').show()
spark.sql('select count(*) from order_index;').show()

#读取mediamatch_usermsg前5条数据
spark.sql('select * from media_index').show(5,False)

+--------+
|count(1)|
+--------+
|  119229|
+--------+

+--------+
|count(1)|
+--------+
|     447|
+--------+

+--------+
|count(1)|
+--------+
|   14481|
+--------+

+--------+
|count(1)|
+--------+
|    5368|
+--------+

+--------+
|count(1)|
+--------+
|    9938|
+--------+

+-----------+--------+--------+------------+---------------+---------------+----------+----------+------------+----------+----------+------+--------+--------+---------+-------------+-------------+--------+
|terminal_no|phone_no|duration|station_name|origin_time    |end_time       |owner_code|owner_name|vod_cat_tags|resolution|audio_lang|region|res_name|res_type|vod_title|category_name|program_title|sm_name |
+-----------+--------+--------+------------+---------------+---------------+----------+----------+------------+----------+----------+------+--------+--------+---------+-------------+-------------+--------+
|1900029957 |2020273 |1553000 |中央2台-高清|2018/5/17 7:59 |2018/5/17 8:25 |0         |HC级      |NULL      

# 任务2 对数据进行清洗并将结果储存至DWD层

In [8]:
#读取ODS数据
media_index = spark.read.table('ODS.media_index')
mediamatch_userevent = spark.read.table('ODS.mediamatch_userevent')
mediamatch_usermsg = spark.read.table('ODS.mediamatch_usermsg')
mmconsume_billevents = spark.read.table('ODS.mmconsume_billevents')
order_index = spark.read.table('ODS.order_index')

## 2.1 筛选主要研究对象

In [9]:
#查看‘owner_name’和‘owner_code’字段
media_index.groupBy('owner_name').count().show()
media_index.groupBy('owner_code').count().show()
mediamatch_userevent.groupBy('owner_name').count().show()
mediamatch_userevent.groupBy('owner_code').count().show()
mediamatch_usermsg.groupBy('owner_name').count().show()
mediamatch_usermsg.groupBy('owner_code').count().show()
mmconsume_billevents.groupBy('owner_name').count().show()
mmconsume_billevents.groupBy('owner_code').count().show()
order_index.groupBy('owner_name').count().show()
order_index.groupBy('owner_code').count().show()

+----------+------+
|owner_name| count|
+----------+------+
|      HC级|109971|
|      EE级|   455|
|      HE级|  8803|
+----------+------+

+----------+------+
|owner_code| count|
+----------+------+
|         0|111350|
|      NULL|  7879|
+----------+------+

+----------+-----+
|owner_name|count|
+----------+-----+
|      EA级|    1|
|      HC级|  418|
|      EE级|   28|
+----------+-----+

+----------+-----+
|owner_code|count|
+----------+-----+
|         0|  438|
|         5|    1|
|         2|    1|
|      NULL|    7|
+----------+-----+

+----------+-----+
|owner_name|count|
+----------+-----+
|      EA级|   15|
|      HC级|13592|
|      HA级|   11|
|      EE级|  840|
|      HB级|    7|
|      EB级|   16|
+----------+-----+

+----------+-----+
|owner_code|count|
+----------+-----+
|      null|  288|
|         6|    8|
|         5|   48|
|        15|   10|
|         8|    1|
|         7|    3|
|         2|   18|
|         0|14105|
+----------+-----+

+----------+-----+
|owner_name|count|
+----

#### 需要过滤的数据如下：
#### owner_name='EA','EB',"EC".'ED','EE';
#### owner_code=02,09,10
#### 用table_name1表示滤除操作的结果，注意研究对象需要去重

In [10]:
#根据要求对数据进行筛选
media_index1 = media_index.distinct().filter("owner_name != 'EA级' and owner_name != 'EB级' and owner_name != 'EC级' and owner_name != 'ED级' and owner_name != 'EE级' and owner_code != '2' and owner_code != '9' and owner_code != '10' ")
mediamatch_userevent1 = mediamatch_userevent.distinct().filter("owner_name != 'EA级' and owner_name != 'EB级' and owner_name != 'EC级' and owner_name != 'ED级' and owner_name != 'EE级' and owner_code != '2' and owner_code != '9' and owner_code != '10' ")
mediamatch_usermsg1 = mediamatch_usermsg.distinct().filter("owner_name != 'EA级' and owner_name != 'EB级' and owner_name != 'EC级' and owner_name != 'ED级' and owner_name != 'EE级' and owner_code != '2' and owner_code != '9' and owner_code != '10' ")
mmconsume_billevents1 = mmconsume_billevents.distinct().filter("owner_name != 'EA级' and owner_name != 'EB级' and owner_name != 'EC级' and owner_name != 'ED级' and owner_name != 'EE级' and owner_code != '2' and owner_code != '9' and owner_code != '10' ")
order_index1 = order_index.distinct().filter("owner_name != 'EA级' and owner_name != 'EB级' and owner_name != 'EC级' and owner_name != 'ED级' and owner_name != 'EE级' and owner_code != '2' and owner_code != '9' and owner_code != '10' ")

In [11]:
#查看筛选后的结果
media_index1.groupBy('owner_name').count().show()
media_index1.groupBy('owner_code').count().show()
mediamatch_userevent1.groupBy('owner_name').count().show()
mediamatch_userevent1.groupBy('owner_code').count().show()
mediamatch_usermsg1.groupBy('owner_name').count().show()
mediamatch_usermsg1.groupBy('owner_code').count().show()
mmconsume_billevents1.groupBy('owner_name').count().show()
mmconsume_billevents1.groupBy('owner_code').count().show()
order_index1.groupBy('owner_name').count().show()
order_index1.groupBy('owner_code').count().show()

+----------+-----+
|owner_name|count|
+----------+-----+
|      HC级|63411|
|      HE级| 4947|
+----------+-----+

+----------+-----+
|owner_code|count|
+----------+-----+
|         0|63923|
|      NULL| 4435|
+----------+-----+

+----------+-----+
|owner_name|count|
+----------+-----+
|      HC级|  408|
+----------+-----+

+----------+-----+
|owner_code|count|
+----------+-----+
|         0|  400|
|         5|    1|
|      NULL|    7|
+----------+-----+

+----------+-----+
|owner_name|count|
+----------+-----+
|      HC级|13301|
|      HA级|   11|
|      HB级|    7|
+----------+-----+

+----------+-----+
|owner_code|count|
+----------+-----+
|         6|    8|
|         5|   48|
|        15|   10|
|         8|    1|
|         7|    3|
|         0|13249|
+----------+-----+

+----------+-----+
|owner_name|count|
+----------+-----+
|      HC级| 3912|
|      HE级| 1179|
+----------+-----+

+----------+-----+
|owner_code|count|
+----------+-----+
|         0| 4073|
|         5|   38|
|         6| 

### 2.2 筛选主要业务类型

#### 在2.1筛选结果table_name1的基础上筛选sm_name
#### 只保留sm_name='互动电视','甜果电视','数字电视'.'珠江宽频'

In [12]:
media_index1.groupBy('sm_name').count().show()
mediamatch_userevent1.groupBy('sm_name').count().show()
mediamatch_usermsg1.groupBy('sm_name').count().show()
mmconsume_billevents1.groupBy('sm_name').count().show()
order_index1.groupBy('sm_name').count().show()

+--------+-----+
| sm_name|count|
+--------+-----+
|互动电视|68358|
+--------+-----+

+------------+-----+
|     sm_name|count|
+------------+-----+
|    互动电视|   69|
|模拟有线电视|  177|
|    数字电视|  103|
|    珠江宽频|   40|
|    甜果电视|   19|
+------------+-----+

+------------+-----+
|     sm_name|count|
+------------+-----+
|    互动电视| 1999|
|模拟有线电视| 5745|
|    数字电视| 3221|
|    珠江宽频| 1642|
|    甜果电视|  712|
+------------+-----+

+--------+-----+
| sm_name|count|
+--------+-----+
|互动电视| 2366|
|数字电视| 1375|
|珠江宽频|  405|
|甜果电视|  945|
+--------+-----+

+------------+-----+
|     sm_name|count|
+------------+-----+
|    互动电视| 5037|
|模拟有线电视|  743|
|    数字电视|  830|
|    珠江宽频|  927|
|    甜果电视| 1370|
|        NULL|  412|
+------------+-----+



#### 用table_name2表示筛选结果

In [13]:
#根据要求对数据进行筛选
media_index2 = media_index1.filter("sm_name == '互动电视' or sm_name == '甜果电视' or sm_name == '数字电视' or sm_name == '珠江宽频'")
mediamatch_userevent2 = mediamatch_userevent1.filter("sm_name == '互动电视' or sm_name == '甜果电视' or sm_name == '数字电视' or sm_name == '珠江宽频'")
mediamatch_usermsg2 = mediamatch_usermsg1.filter("sm_name == '互动电视' or sm_name == '甜果电视' or sm_name == '数字电视' or sm_name == '珠江宽频'")
mmconsume_billevents2 = mmconsume_billevents1.filter("sm_name == '互动电视' or sm_name == '甜果电视' or sm_name == '数字电视' or sm_name == '珠江宽频'")
order_index2 = order_index1.filter("sm_name == '互动电视' or sm_name == '甜果电视' or sm_name == '数字电视' or sm_name == '珠江宽频'")

In [14]:
media_index2.groupBy('sm_name').count().show()
mediamatch_userevent2.groupBy('sm_name').count().show()
mediamatch_usermsg2.groupBy('sm_name').count().show()
mmconsume_billevents2.groupBy('sm_name').count().show()
order_index2.groupBy('sm_name').count().show()

+--------+-----+
| sm_name|count|
+--------+-----+
|互动电视|68358|
+--------+-----+

+--------+-----+
| sm_name|count|
+--------+-----+
|互动电视|   69|
|数字电视|  103|
|珠江宽频|   40|
|甜果电视|   19|
+--------+-----+

+--------+-----+
| sm_name|count|
+--------+-----+
|互动电视| 1999|
|数字电视| 3221|
|珠江宽频| 1642|
|甜果电视|  712|
+--------+-----+

+--------+-----+
| sm_name|count|
+--------+-----+
|互动电视| 2366|
|数字电视| 1375|
|珠江宽频|  405|
|甜果电视|  945|
+--------+-----+

+--------+-----+
| sm_name|count|
+--------+-----+
|互动电视| 5037|
|数字电视|  830|
|珠江宽频|  927|
|甜果电视| 1370|
+--------+-----+



### 2.3 筛选主要状态用户

#### 查看run_name字段
#### 保留run_name=“正常”、“主动暂停”、“主动销号”、“欠费暂停”
#### 在table_name2基础上用table_name3表示
#### 只有部分table包含run_name信息

In [15]:
#media_index2.groupBy('run_name').count().show()
mediamatch_userevent2.groupBy('run_name').count().show()
mediamatch_usermsg2.groupBy('run_name').count().show()
#mmconsume_billevents2.groupBy('run_name').count().show()
order_index2.groupBy('run_name').count().show()

+--------+-----+
|run_name|count|
+--------+-----+
|欠费暂停|   32|
|    冲正|    2|
|主动暂停|   47|
|主动销户|   10|
|    正常|  140|
+--------+-----+

+--------+-----+
|run_name|count|
+--------+-----+
|欠费暂停| 1035|
|    冲正|   48|
|    创建|    1|
|主动暂停| 1567|
|主动销户|  385|
|被动销户|    1|
|    正常| 4537|
+--------+-----+

+--------+-----+
|run_name|count|
+--------+-----+
|欠费暂停|  336|
|    冲正|    4|
|主动暂停|  389|
|主动销户|   68|
|    正常| 7367|
+--------+-----+



In [16]:
mediamatch_userevent3 = mediamatch_userevent2.filter("run_name == '正常' or run_name == '主动暂停' or run_name == '主动销户' or run_name == '欠费暂停'")
mediamatch_usermsg3 = mediamatch_usermsg2.filter("run_name == '正常' or run_name == '主动暂停' or run_name == '主动销户' or run_name == '欠费暂停'")
order_index3 = order_index2.filter("run_name == '正常' or run_name == '主动暂停' or run_name == '主动销户' or run_name == '欠费暂停'")

In [17]:
mediamatch_userevent3.groupBy('run_name').count().show()
mediamatch_usermsg3.groupBy('run_name').count().show()
order_index3.groupBy('run_name').count().show()

+--------+-----+
|run_name|count|
+--------+-----+
|欠费暂停|   32|
|主动暂停|   47|
|主动销户|   10|
|    正常|  140|
+--------+-----+

+--------+-----+
|run_name|count|
+--------+-----+
|欠费暂停| 1035|
|主动暂停| 1567|
|主动销户|  385|
|    正常| 4537|
+--------+-----+

+--------+-----+
|run_name|count|
+--------+-----+
|欠费暂停|  336|
|主动暂停|  389|
|主动销户|   68|
|    正常| 7367|
+--------+-----+



### 2.4 过滤异常收视数据，查看duration字段
#### 只有media_index含有duratio字段
#### 过滤异常收视数据：观看时长大于6小时(360000)或小于10秒(10000)
#### 筛选结果media_index3表示

In [18]:
media_index2.groupBy('duration').count().show()
media_index2.select("duration").describe().show()
media_index2.select("duration").distinct().orderBy('duration',ascending=True).show()
media_index2.select("duration").distinct().orderBy('duration',ascending=False).show()
#distinct()用来去重

+--------+-----+
|duration|count|
+--------+-----+
|  113000|   89|
| 4059000|    2|
| 2717000|   12|
| 2523000|    5|
|  702000|   17|
| 2019000|    7|
|  135000|   62|
|  499000|   14|
| 1078000|   13|
|  419000|   14|
|  271000|   33|
| 1888000|    6|
| 5005000|    1|
|  213000|   37|
|   85000|  113|
|  450000|   28|
| 1169000|   15|
| 1022000|   12|
|  441000|   22|
|  552000|   23|
+--------+-----+
only showing top 20 rows

+-------+------------------+
|summary|          duration|
+-------+------------------+
|  count|             68358|
|   mean| 984202.8145937565|
| stddev|1316541.9310468643|
|    min|                 0|
|    max|          17863000|
+-------+------------------+

+--------+
|duration|
+--------+
|       0|
|    1000|
|    2000|
|    3000|
|    4000|
|    5000|
|    6000|
|    7000|
|    8000|
|    9000|
|   10000|
|   11000|
|   12000|
|   13000|
|   14000|
|   15000|
|   16000|
|   17000|
|   18000|
|   19000|
+--------+
only showing top 20 rows

+--------+
|du

In [19]:
media_index3 = media_index2.filter("duration > 10000 and duration < 21600000")

In [20]:
media_index3.groupBy('duration').count().show()
media_index3.select("duration").describe().show()
media_index3.select("duration").distinct().orderBy('duration',ascending=True).show()
media_index3.select("duration").distinct().orderBy('duration',ascending=False).show()

+--------+-----+
|duration|count|
+--------+-----+
|  113000|   89|
| 4059000|    2|
| 2717000|   12|
| 2523000|    5|
|  702000|   17|
| 2019000|    7|
|  135000|   62|
|  499000|   14|
| 1078000|   13|
|  419000|   14|
|  271000|   33|
| 1888000|    6|
| 5005000|    1|
|  213000|   37|
|   85000|  113|
|  450000|   28|
| 1169000|   15|
| 1022000|   12|
|  441000|   22|
|  552000|   23|
+--------+-----+
only showing top 20 rows

+-------+------------------+
|summary|          duration|
+-------+------------------+
|  count|             67771|
|   mean| 992694.9432648183|
| stddev|1319051.6843821446|
|    min|             11000|
|    max|          17863000|
+-------+------------------+

+--------+
|duration|
+--------+
|   11000|
|   12000|
|   13000|
|   14000|
|   15000|
|   16000|
|   17000|
|   18000|
|   19000|
|   20000|
|   21000|
|   22000|
|   23000|
|   24000|
|   25000|
|   26000|
|   27000|
|   28000|
|   29000|
|   30000|
+--------+
only showing top 20 rows

+--------+
|du

### 2.5 保存数据到DWD层

In [21]:
#创建DWD数据库
spark.sql('create database if not exists DWD;')
#保存数据
media_index3.write.mode('overwrite').saveAsTable("DWD.media_index")
mediamatch_userevent3.write.mode('overwrite').saveAsTable("DWD.mediamatch_userevent")
mediamatch_usermsg3.write.mode('overwrite').saveAsTable("DWD.mediamatch_usermsg")
mmconsume_billevents2.write.mode('overwrite').saveAsTable("DWD.mmconsume_billevents")
order_index3.write.mode('overwrite').saveAsTable("DWD.order_index")
#查看数据table
spark.sql('show databases;').show()
spark.sql('use DWD;')
spark.sql('show tables;').show()

+---------+
|namespace|
+---------+
|  default|
|      dwd|
|      dws|
|      ods|
+---------+

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|      dwd|         media_index|      false|
|      dwd|mediamatch_userevent|      false|
|      dwd|  mediamatch_usermsg|      false|
|      dwd|mmconsume_billevents|      false|
|      dwd|         order_index|      false|
+---------+--------------------+-----------+



# 任务3：对用户数据贴上标签，将结果整合成用户画像并储存至DWS层

In [22]:
#读取DWD数据
media_index = spark.read.table('DWD.media_index')
mediamatch_userevent = spark.read.table('DWD.mediamatch_userevent')
mediamatch_usermsg = spark.read.table('DWD.mediamatch_usermsg')
mmconsume_billevents = spark.read.table('DWD.mmconsume_billevents')
order_index = spark.read.table('DWD.order_index')

### 3.1 用户依赖度（用户日均观看时长）
### 输出NeedLabel

In [23]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
#查看用户观看时间
media_index.select('end_time').orderBy('origin_time').show(5)
media_index.select('end_time').orderBy('end_time',ascending=False).show(5)
#统计用户5-7日平均观看时长
TVNeed = media_index.select('phone_no','duration').groupBy('phone_no').agg((sum('duration')/1000/60/60/90).alias('DayAvgTVLookTime'))
TVNeed.orderBy('DayAvgTVLookTime').show(5)
TVNeed.orderBy('DayAvgTVLookTime',ascending=False).show(5)
#定义依赖度函数
def needLabel(DayAvgTVLookTime):
    if(DayAvgTVLookTime>=5):
        return '1级依赖'
    elif(DayAvgTVLookTime>=4):
        return '2级依赖'
    elif(DayAvgTVLookTime>=3):
        return '3级依赖'
    elif(DayAvgTVLookTime>=2):
        return '4级依赖'
    elif(DayAvgTVLookTime>=1):
        return '5级依赖'
    else:
        return '6级依赖'
#定义数据类型为String
UDPNeedLabel = udf(needLabel,StringType())
NeedLabel = TVNeed.withColumn('NeedLabel',UDPNeedLabel('DayAvgTVLookTime')).select('phone_no','DayAvgTVLookTime','NeedLabel')
NeedLabel.orderBy('DayAvgTVLookTime').show(5)
NeedLabel.orderBy('DayAvgTVLookTime',ascending=False).show(5)

+--------------+
|      end_time|
+--------------+
|2018/5/10 0:55|
|2018/5/10 0:43|
|2018/5/10 1:05|
|2018/5/10 0:40|
|2018/5/10 0:30|
+--------------+
only showing top 5 rows

+-------------+
|     end_time|
+-------------+
|2018/7/9 9:56|
|2018/7/9 9:56|
|2018/7/9 9:55|
|2018/7/9 9:55|
|2018/7/9 9:55|
+-------------+
only showing top 5 rows

+--------+--------------------+
|phone_no|    DayAvgTVLookTime|
+--------+--------------------+
| 2048353|3.271604938271605E-4|
| 2004595|0.001546296296296...|
| 2039526|0.004996913580246913|
| 2047858|0.005901234567901235|
| 2049976|0.008043209876543209|
+--------+--------------------+
only showing top 5 rows

+--------+-----------------+
|phone_no| DayAvgTVLookTime|
+--------+-----------------+
| 2045584| 5.44308024691358|
| 2020364|5.387768518518519|
| 2038740|5.279188271604938|
| 2003507|5.152728395061728|
| 2028383|4.907515432098766|
+--------+-----------------+
only showing top 5 rows

+--------+--------------------+---------+
|phone_no|  

### 3.2 消费水平（用户日均消费金额）
### 输出ConsumeLabel

In [24]:
#查看用户账单时间，确定时间范围
mmconsume_billevents.select('year_month').orderBy('year_month').show(5)
mmconsume_billevents.select('year_month').orderBy('year_month',ascending=False).show(5)
#统计5-7月日均消费金额
ConsumeLevel = mmconsume_billevents.filter("year_month >= '2018-05-01'").groupBy('phone_no').agg((mean(col('should_pay')-col('favour_fee'))).alias('AvgPay'))
ConsumeLevel.orderBy('AvgPay').show(5)
ConsumeLevel.orderBy('AvgPay',ascending=False).show(5)
##定义消费水平函数
def consumeLabel(AvgPay):
    if(AvgPay >= 50):
        return '1级用户'
    elif(AvgPay >= 40):
        return '2级用户'
    elif(AvgPay >= 30):
        return '3级用户'
    elif(AvgPay >= 20):
        return '4级用户'
    elif(AvgPay >= 10):
        return '5级用户'
    else:
        return '6级用户'
#定义标签为String数据类型
UDFconsumeLAbel = udf(consumeLabel,StringType())
ConsumeLabel = ConsumeLevel.withColumn('ConsumeLabel',UDFconsumeLAbel('AvgPay')).select('phone_no','AvgPay','ConsumeLabel')
ConsumeLabel.orderBy('AvgPay').show(5)
ConsumeLabel.orderBy('AvgPay',ascending=False).show(5)

+-------------+
|   year_month|
+-------------+
|2018/1/1 0:00|
|2018/1/1 0:00|
|2018/1/1 0:00|
|2018/1/1 0:00|
|2018/1/1 0:00|
+-------------+
only showing top 5 rows

+-------------+
|   year_month|
+-------------+
|2018/7/1 0:00|
|2018/7/1 0:00|
|2018/7/1 0:00|
|2018/7/1 0:00|
|2018/7/1 0:00|
+-------------+
only showing top 5 rows

+--------+-------------------+
|phone_no|             AvgPay|
+--------+-------------------+
| 2040996|               -0.4|
| 2027940|-0.2857142857142857|
| 2045538|  4.431034482758621|
| 2017316|  4.636363636363637|
| 2017496|                5.0|
+--------+-------------------+
only showing top 5 rows

+--------+------------------+
|phone_no|            AvgPay|
+--------+------------------+
| 2001520|              60.0|
| 2009411|              60.0|
| 2041482|56.714285714285715|
| 2009066|              55.0|
| 2004503|              55.0|
+--------+------------------+
only showing top 5 rows

+--------+-------------------+------------+
|phone_no|         

### 3.3 入网程度（用户入网时间）
### 输出YearLabel

#### months_between() ——函数内日期格式中年月日应用 ‘-’ 分隔，不能用 ‘/’ 分隔，否则函数只会输出 Null
#### from_unixtime(unix_timestamp()) ——输出当前电脑时间，精确到秒
#### months_between(date1,date1) ——输出月份间隔

In [25]:
#查看用户入网时间
mediamatch_usermsg.select('open_time').orderBy('open_time').show(5)
#统计用户入网时间
Year = mediamatch_usermsg.select("phone_no",((months_between(from_unixtime(unix_timestamp()),'open_time')/12)).alias("Year"))
Year.orderBy("Year").show(5)
Year.orderBy("Year",ascending=False).show(5)
#定义入网程度函数
def yearLabel(Year):
    if(Year <= 10):
        return '骨灰等级6'
    elif(Year <=13):
        return '骨灰等级5'
    elif(Year <=16):
        return '骨灰等级4'
    elif(Year <=19):
        return '骨灰等级3'
    elif(Year <=22):
        return '骨灰等级2'
    else:
        return '骨灰等级1'
#定义为String数据类型
UDFYearLabel = udf(yearLabel,StringType())
YearLabel = Year.withColumn('YearLAbel',UDFYearLabel('Year')).select('phone_no','Year','YearLAbel')
YearLabel.orderBy("Year").show(5)
YearLabel.orderBy("Year",ascending=False).show(5)


+-------------------+
|          open_time|
+-------------------+
|2001-04-30 00:00:00|
|2001-10-18 00:00:00|
|2002-07-31 00:00:00|
|2002-12-27 00:00:00|
|2003-04-22 00:00:00|
+-------------------+
only showing top 5 rows

+--------+-----------------+
|phone_no|             Year|
+--------+-----------------+
| 2012220|9.950293925833334|
| 2012045|9.961321030833334|
| 2012277|9.977212981666666|
| 2012192|9.977278319166667|
| 2012280|9.982311174166666|
+--------+-----------------+
only showing top 5 rows

+--------+------------------+
|phone_no|              Year|
+--------+------------------+
| 2023730|22.948867700833333|
| 2016569|      22.481125765|
| 2016602|21.696179529166667|
| 2016579|       21.29026555|
| 2023924|     20.9703730775|
+--------+------------------+
only showing top 5 rows

+--------+-----------------+---------+
|phone_no|             Year|YearLAbel|
+--------+-----------------+---------+
| 2012220|9.950293925833334|骨灰等级6|
| 2012045|9.961321030833334|骨灰等级6|
| 2012277

### 3.4消费内容
### 输出BillLabel

In [26]:
#查看用户消费内容fee_code
mmconsume_billevents.groupBy('fee_code').count().show()
#自定义UDF函数
def billLabel(fee_code):
    if(fee_code == '0J' or fee_code == '0B' or fee_code == '0Y'):
        return '直播'
    elif(fee_code == '0X'):
        return '应用'  
    elif(fee_code == '0T'):
        return '付费频道'  
    elif(fee_code == '0D'):
        return '点播'  
    elif(fee_code == '0H'):
        return '回看'
    else:
        return '宽带'
#统计用户消费内容
UDFbillLabel = udf(billLabel,StringType())
BillLabel = mmconsume_billevents.withColumn('BillLabel',UDFbillLabel('fee_code')).select('phone_no','fee_code','BillLabel')
BillLabel.show(5)

+--------+-----+
|fee_code|count|
+--------+-----+
|      0H|  682|
|      0K|  381|
|      0D|  666|
|      0X|  277|
|      0B| 1688|
|      0W|    2|
|      0T|  516|
|      0L|   22|
|      0Y|  857|
+--------+-----+

+--------+--------+---------+
|phone_no|fee_code|BillLabel|
+--------+--------+---------+
| 2030386|      0Y|     直播|
| 2017488|      0H|     回看|
| 2022200|      0D|     点播|
| 2010025|      0B|     直播|
| 2020450|      0B|     直播|
+--------+--------+---------+
only showing top 5 rows



### 将所有Label合并，形成用户画像

In [27]:
Persona = NeedLabel.join(ConsumeLabel,ConsumeLabel.phone_no == NeedLabel.phone_no).drop(ConsumeLabel.phone_no)
Persona = Persona.join(YearLabel,YearLabel.phone_no == Persona.phone_no).drop(YearLabel.phone_no)
Persona = Persona.join(BillLabel,BillLabel.phone_no == Persona.phone_no).drop(BillLabel.phone_no)
Persona = Persona.select(NeedLabel.phone_no,'NeedLabel','ConsumeLabel','YearLabel','BillLAbel').distinct().orderBy('phone_no')
Persona.show()

+--------+---------+------------+---------+---------+
|phone_no|NeedLabel|ConsumeLabel|YearLabel|BillLAbel|
+--------+---------+------------+---------+---------+
| 2000121|  3级依赖|     4级用户|骨灰等级5|     直播|
| 2000126|  5级依赖|     5级用户|骨灰等级5|     回看|
| 2000126|  5级依赖|     5级用户|骨灰等级5|     直播|
| 2000126|  5级依赖|     5级用户|骨灰等级5|     点播|
| 2000638|  3级依赖|     4级用户|骨灰等级5|     直播|
| 2000812|  6级依赖|     4级用户|骨灰等级5|     直播|
| 2001102|  6级依赖|     4级用户|骨灰等级5|     直播|
| 2001498|  3级依赖|     4级用户|骨灰等级5|     直播|
| 2001789|  6级依赖|     5级用户|骨灰等级5|     回看|
| 2001789|  6级依赖|     5级用户|骨灰等级5|     点播|
| 2001789|  6级依赖|     5级用户|骨灰等级5|     直播|
| 2002504|  5级依赖|     5级用户|骨灰等级5|     回看|
| 2002504|  5级依赖|     5级用户|骨灰等级5|     点播|
| 2002504|  5级依赖|     5级用户|骨灰等级5|     直播|
| 2002651|  4级依赖|     4级用户|骨灰等级5|     直播|
| 2002838|  6级依赖|     6级用户|骨灰等级5|     回看|
| 2002838|  6级依赖|     6级用户|骨灰等级5| 付费频道|
| 2002838|  6级依赖|     6级用户|骨灰等级5|     直播|
| 2002992|  5级依赖|     4级用户|骨灰等级5|     点播|
| 2002992|  5级依赖|     4级用户|骨灰等级5|     回看|


### 3.5 将用户画像存入DWS层 

In [28]:
#创建DWD数据库
spark.sql('create database if not exists DWS;')
#保存数据
Persona.write.mode('overwrite').saveAsTable("DWs.Persona")
#查看数据table
spark.sql('show databases;').show()
spark.sql('use DWs;')
spark.sql('show tables;').show()

+---------+
|namespace|
+---------+
|  default|
|      dwd|
|      dws|
|      ods|
+---------+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|      dws|  persona|      false|
+---------+---------+-----------+



In [29]:
persona = spark.read.table('DWS.persona')
persona.show()

+--------+---------+------------+---------+---------+
|phone_no|NeedLabel|ConsumeLabel|YearLabel|BillLAbel|
+--------+---------+------------+---------+---------+
| 2000121|  3级依赖|     4级用户|骨灰等级5|     直播|
| 2000126|  5级依赖|     5级用户|骨灰等级5|     回看|
| 2000126|  5级依赖|     5级用户|骨灰等级5|     直播|
| 2000126|  5级依赖|     5级用户|骨灰等级5|     点播|
| 2000638|  3级依赖|     4级用户|骨灰等级5|     直播|
| 2000812|  6级依赖|     4级用户|骨灰等级5|     直播|
| 2001102|  6级依赖|     4级用户|骨灰等级5|     直播|
| 2001498|  3级依赖|     4级用户|骨灰等级5|     直播|
| 2001789|  6级依赖|     5级用户|骨灰等级5|     回看|
| 2001789|  6级依赖|     5级用户|骨灰等级5|     点播|
| 2001789|  6级依赖|     5级用户|骨灰等级5|     直播|
| 2002504|  5级依赖|     5级用户|骨灰等级5|     回看|
| 2002504|  5级依赖|     5级用户|骨灰等级5|     点播|
| 2002504|  5级依赖|     5级用户|骨灰等级5|     直播|
| 2002651|  4级依赖|     4级用户|骨灰等级5|     直播|
| 2002838|  6级依赖|     6级用户|骨灰等级5|     回看|
| 2002838|  6级依赖|     6级用户|骨灰等级5| 付费频道|
| 2002838|  6级依赖|     6级用户|骨灰等级5|     直播|
| 2002992|  5级依赖|     4级用户|骨灰等级5|     点播|
| 2002992|  5级依赖|     4级用户|骨灰等级5|     回看|
