In [None]:
** 教程 **
https://www.cnblogs.com/muchen/p/6882276.html
使用Spark对MovieLens的特征进行提取

本章属于数据预处理环节

In [3]:
import os
import sys
spark_home = os.environ.get('SPARK_HOME',None)
print(spark_home)
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.10.4-src.zip'))

exec(open(os.path.join(spark_home,'python/pyspark/shell.py')).read())

/home/yang0/installer/spark-2.4.3-bin-hadoop2.7
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Python version 3.6.5 (default, Jul 11 2019 14:32:02)
SparkSession available as 'spark'.


**类别特征提取**
在很多场景下，数据集的很多特征是类型变量，比如MovieLens里面的职业类型。这样的变量无法作为很多算法的输入，因为这类变量无法作用于样本间距离的计算。

可参考的方法是 1 of k 编码，就是将某种类型的特征打平，将其转化为具有n列的向量。具体的做法是先为特征列创建字典，然后将各具体特征值映射到 1 of k 编码。

下面以MoveiLens中的职业类型特征为例，演示特征值为programmer的特征提取：

In [4]:
%pylab

Using matplotlib backend: agg
Populating the interactive namespace from numpy and matplotlib


In [5]:
# 载入数据集
user_data = sc.textFile("./ml-100k/u.user")
# 以' | '切分每列，返回新的用户RDD
user_fields = user_data.map(lambda line: line.split("|"))
# 获取职业RDD并落地
all_occupations = user_fields.map(lambda fields: fields[3]).distinct().collect()
# 对各职业进行排序
all_occupations.sort()
 
# 构建字典
idx = 0
all_occupations_dict = {}
for o in all_occupations:
    all_occupations_dict[o] = idx
    idx +=1
 
# 生成并打印职业为程序员(programmer)的1 of k编码
K = len(all_occupations_dict)
binary_x = np.zeros(K)
k_programmer = all_occupations_dict['programmer']
binary_x[k_programmer] = 1
print("程序员的1 of k编码为: %s" % binary_x)

程序员的1 of k编码为: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]


**派生特征提取**
 并非所有的特征均可直接拿来学习。比如电影发行日期特征，它显然无法拿来进行学习。但正如上一节所做的一个工作，将它转化为电影年龄，这就可以在很多场景下进行学习了。

再比如时间戳属性，可参考将他们转为为：早/中/晚这样的分类变量：

In [6]:
# 载入数据集
rating_data_raw = sc.textFile("./ml-100k/u.data")
print("用户id | 影片id | 评分值 | 时间戳(timestamp格式)")
print(rating_data_raw.first())

# 获取评分RDD
rating_data = rating_data_raw.map(lambda line: line.split("\t"))
ratings = rating_data.map(lambda fields: int(fields[2]))
 
# 函数: 将时间戳格式转换为datetime格式
def extract_datetime(ts):
    import datetime
    return datetime.datetime.fromtimestamp(ts)
 
# 获取小时RDD
timestamps = rating_data.map(lambda fields: int(fields[3]))
hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour)
 
# 函数: 将小时映射为分类变量并展示
def assign_tod(hr):
    times_of_day = {
                'morning' : range(7, 12),
                'lunch' : range(12, 14),
                'afternoon' : range(14, 18),
                'evening' : range(18, 23),
                'night' : concatenate((range(23, 25),range(1,7)), 0) # 俩个数组合并，0是数组下标
                }
    for k, v in times_of_day.items():
        if hr in v: 
            return k
 
# 获取新的分类变量RDD
a = concatenate((range(1, 7),range(7,10)), 0)
print(a)
print(hour_of_day.take(5))
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)

用户id | 影片id | 评分值 | 时间戳(timestamp格式)
196	242	3	881250949
[1 2 3 4 5 6 7 8 9]
[23, 3, 15, 13, 13]


['night', 'night', 'afternoon', 'lunch', 'lunch']

**文本特征提取**
 关于文本特征提取方法有很多，本文仅介绍一个简单而又经典的提取方法 - 词袋法。
其基本步骤如下：

1. 分词 - 将文本分割为由词组成的集合。可根据空格符，标点进行分割；
2. 删除停用词 - the and 这类词无学习的价值意义，删除之；
3. 提取词干 - 将各个词转化为其基本形式，如men -> man；
4. 向量化 - 从根本上来说和1 of k相同。不过由于词往往很多，所以稀疏矩阵技术很重要；

下面将MovieLens数据集中的影片标题进行特征提取：

In [16]:
# 载入数据集
movie_data = sc.textFile("./ml-100k/u.item")

# 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
# 以' | '切分每列，返回影片RDD
movie_fields = movie_data.map(lambda lines: lines.split("|"))
 
# 函数: 剔除掉标题中的(年份)部分
def extract_title(raw):
    import re
    grps = re.search("\((\w+)\)", raw)
    if grps:
        return raw[:grps.start()].strip()
    else:
        return raw
 
# 获取影片名RDD
raw_titles = movie_fields.map(lambda fields: fields[1])
 
# 剔除影片名中的(年份)
movie_titles = raw_titles.map(lambda m: extract_title(m))
 
# 由于仅仅是个展示的例子，简简单单用空格分割
title_terms = movie_titles.map(lambda t: t.split(" "))
 
# 搜集所有的词
all_terms = title_terms.flatMap(lambda x: x).distinct().collect()

# 创建字典
idx = 0
all_terms_dict = {}
for term in all_terms:
    all_terms_dict[term] = idx
    idx +=1
num_terms = len(all_terms_dict)

from scipy import sparse as sp
a = sp.csc_matrix((1, 3))
a[0,2] = 1
print(a)
print("====")
 
# 函数: 采用稀疏向量格式保存编码后的特征并返回
def create_vector(terms, term_dict):
    x = sp.csc_matrix((1, num_terms))
    for t in terms:
        if t in term_dict:
            idx = term_dict[t]
            x[0, idx] = 1
    return x
 
# 将字典保存为广播数据格式类型。因为各个worker都要用
all_terms_bcast = sc.broadcast(all_terms_dict)
# 采用稀疏矩阵格式保存影片名特征

print(title_terms.first())

term_vectors = title_terms.map(lambda terms: create_vector(terms, all_terms_bcast.value))
# 展示提取结果
term_vectors.take(5)

  self._set_intXint(row, col, x.flat[0])


  (0, 2)	1.0
====
['Toy', 'Story']


[<1x2645 sparse matrix of type '<class 'numpy.float64'>'
 	with 2 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type '<class 'numpy.float64'>'
 	with 1 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type '<class 'numpy.float64'>'
 	with 2 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type '<class 'numpy.float64'>'
 	with 2 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type '<class 'numpy.float64'>'
 	with 1 stored elements in Compressed Sparse Column format>]

**归一化特征**

 归一化最经典的做法就是所有特征值-最小值/特征区间。但对于一般特征的归一化网上很多介绍，请读者自行学习。本文仅对特征向量的归一化做介绍。

一般来说，我们是先计算向量的二阶范数，然后让向量的所有元素去除以这个范数。

下面演示对某随机向量进行归一化：

In [19]:
# 设置随机数种子
np.random.seed(42)
# 生成随机向量
x = np.random.randn(10)
# 产生二阶范数
norm_x_2 = np.linalg.norm(x)
# 归一化
normalized_x = x / norm_x_2
 
# 结果展示
print("向量x:\n%s" % x)
print("向量x的2阶范数: %2.4f" % norm_x_2)
print("归一化后的向量x:\n%s" % normalized_x)
print("归一化后向量x的2阶范数:\n%2.4f" % np.linalg.norm(normalized_x))

向量x:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
向量x的2阶范数: 2.5908
归一化后的向量x:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
归一化后向量x的2阶范数:
1.0000


什么是范数？  https://blog.csdn.net/lz867422770/article/details/80013600

我们知道距离的定义是一个宽泛的概念，只要满足非负、自反、三角不等式就可以称之为距离。范数是一种强化了的距离概念，它在定义上比距离多了一条数乘的运算法则。有时候为了便于理解，我们可以把范数当作距离来理解。

在数学上，范数包括向量范数和矩阵范数，向量范数表征向量空间中向量的大小，矩阵范数表征矩阵引起变化的大小。一种非严密的解释就是，对应向量范数，向量空间中的向量都是有大小的，这个大小如何度量，就是用范数来度量的，不同的范数都可以来度量这个大小，就好比米和尺都可以来度量远近一样；对于矩阵范数，学过线性代数，我们知道，通过运算，可以将向量X变化为B，矩阵范数就是来度量这个变化大小的。


二范数指矩阵A的2范数，就是A的转置共轭矩阵与矩阵A的积的最大特征根的平方根值，是指空间上两个向量矩阵的直线距离。类似于求棋盘上两点间的直线距离。

不懂，需要补线性代数了

https://www.jianshu.com/p/95a8f035c86c
归一化：１）把数据变成(０，１)或者（1,1）之间的小数。主要是为了数据处理方便提出来的，把数据映射到0～1范围之内处理，更加便捷快速。２）把有量纲表达式变成无量纲表达式，便于不同单位或量级的指标能够进行比较和加权。归一化是一种简化计算的方式，即将有量纲的表达式，经过变换，化为无量纲的表达式，成为纯量。



In [21]:
# 导入Spark库中的正则化类
from pyspark.mllib.feature import Normalizer
# 初始化正则化对象
normalizer = Normalizer()
# 创建测试向量(RDD)
vector = sc.parallelize([x])
# 对向量进行归一化并返回结果
normalized_x_mllib = normalizer.transform(vector).first().toArray()
  
# 结果展示
print("向量x:\n%s" % x)
print("向量x的二阶范数: %2.4f" % norm_x_2)
print("被MLlib归一化后的向量x:\n%s" % normalized_x_mllib)
print("被MLlib归一化后的向量x的二阶范数: %2.4f" % np.linalg.norm(normalized_x_mllib))

向量x:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
向量x的二阶范数: 2.5908
被MLlib归一化后的向量x:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
被MLlib归一化后的向量x的二阶范数: 1.0000
