In [None]:
import findspark
findspark.init()
from pyspark import SparkContext
import numpy as np
import matplotlib.pyplot as plt
import re


In [None]:
sc = SparkContext("local[2]","Spark with Python")
user_data = sc.textFile("C:/input/spark/ml-100k/u.user")
user_data.first()

In [None]:
user_fields = user_data.map(lambda line: line.split("|"))
user_fields.take(5)


In [4]:
num_users = user_fields.map(lambda fields: fields[0]).count()
num_genders = user_fields.map(lambda fields:fields[2]).distinct().count()

In [5]:
num_occupations = user_fields.map(lambda fields:fields[3]).distinct().count()

In [6]:
num_zipcodes = user_fields.map(lambda fields:fields[4]).distinct().count()

In [7]:
print("User: %d, genders: %d,occupations: %d, zip_codes: %d " %(num_users,num_genders,num_occupations,num_zipcodes))

User: 943, genders: 2,occupations: 21, zip_codes: 795 


In [8]:
ages = user_fields.map(lambda x:int(x[1])).collect()
plt.hist(ages,bins=20,color='lightblue',normed=True)
fig = plt.gcf()
fig.set_size_inches(16,10)
#plt.show()

The 'normed' kwarg was deprecated in Matplotlib 2.1 and will be removed in 3.1. Use 'density' instead.
  alternative="'density'", removal="3.1")


In [9]:
count_by_occupation = user_fields.map(lambda fields:(fields[3],1)).reduceByKey(lambda x,y:x+y).collect()
user_fields.take(5)

[['1', '24', 'M', 'technician', '85711'],
 ['2', '53', 'F', 'other', '94043'],
 ['3', '23', 'M', 'writer', '32067'],
 ['4', '24', 'M', 'technician', '43537'],
 ['5', '33', 'F', 'other', '15213']]

In [10]:
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])

In [11]:
x_axis = x_axis1[np.argsort(y_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]

In [12]:
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes()
ax.set_xticks(pos + (width /2))
ax.set_xticklabels(x_axis)

plt.bar(pos,y_axis,width,color='lightblue')
plt.xticks(rotation=30)
fig = plt.gcf()
fig.set_size_inches(16,10)
#plt.show()

  "Adding an axes using the same arguments as a previous axes "


In [13]:
count_by_occupation2 = user_fields.map(lambda fields:fields[3]).countByValue()
print(type(count_by_occupation2))
print("Map-reduce   approach:", dict(count_by_occupation))
print("")
print("countByValue approach:", dict(count_by_occupation2))

<class 'collections.defaultdict'>
Map-reduce   approach: {'writer': 45, 'doctor': 7, 'educator': 95, 'lawyer': 12, 'technician': 27, 'artist': 28, 'executive': 32, 'engineer': 67, 'administrator': 79, 'student': 196, 'scientist': 31, 'marketing': 26, 'healthcare': 16, 'other': 105, 'retired': 14, 'librarian': 51, 'salesman': 12, 'entertainment': 18, 'homemaker': 7, 'programmer': 66, 'none': 9}

countByValue approach: {'lawyer': 12, 'technician': 27, 'executive': 32, 'none': 9, 'administrator': 79, 'student': 196, 'marketing': 26, 'scientist': 31, 'healthcare': 16, 'writer': 45, 'homemaker': 7, 'educator': 95, 'other': 105, 'artist': 28, 'engineer': 67, 'retired': 14, 'librarian': 51, 'salesman': 12, 'entertainment': 18, 'doctor': 7, 'programmer': 66}


In [14]:
movie_data = sc.textFile("C:/input/spark/ml-100k/u.item")
movie_data.take(5)

['1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0',
 '2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0',
 '3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0',
 '4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0',
 '5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0']

In [15]:
num_movies = movie_data.count()

In [16]:
print("movies: %d" % num_movies)

movies: 1682


In [17]:
def convert_year(x):
    try:
        return int(x[-4:])
    except:
        return 1900 # 若是据缺失年份则默认设定为1900，后续处理中过滤掉这类数据

In [18]:
movie_fields = movie_data.map(lambda lines: lines.split("|"))
#movie_fields.take(5)
print(type(movie_fields))

<class 'pyspark.rdd.PipelinedRDD'>


In [19]:
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
print(type(years))
years.take(5)

<class 'pyspark.rdd.PipelinedRDD'>


[1995, 1995, 1995, 1995, 1995]

In [20]:
years_filtered = years.filter(lambda x: x!=1900)
years_filtered.take(5)

[1995, 1995, 1995, 1995, 1995]

In [21]:
movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()
print(type(movie_ages))
print(movie_ages)
values = movie_ages.values()
print(values)

<class 'collections.defaultdict'>
defaultdict(<class 'int'>, {0: 65, 1: 286, 2: 355, 3: 219, 4: 214, 5: 126, 6: 37, 7: 22, 8: 24, 9: 15, 10: 11, 11: 13, 12: 15, 13: 7, 14: 8, 15: 5, 16: 13, 17: 12, 18: 8, 19: 9, 20: 4, 21: 4, 22: 5, 23: 6, 24: 8, 25: 4, 26: 3, 27: 7, 28: 3, 29: 4, 30: 6, 31: 5, 32: 2, 33: 5, 34: 2, 35: 6, 36: 5, 37: 3, 38: 5, 39: 4, 40: 9, 41: 8, 42: 4, 43: 5, 44: 7, 45: 2, 46: 3, 47: 5, 48: 7, 49: 4, 50: 3, 51: 5, 52: 5, 53: 4, 54: 5, 55: 4, 56: 2, 57: 5, 58: 8, 59: 7, 60: 3, 61: 4, 62: 2, 63: 4, 64: 4, 65: 2, 66: 1, 67: 1, 68: 1, 72: 1, 76: 1})
dict_values([65, 286, 355, 219, 214, 126, 37, 22, 24, 15, 11, 13, 15, 7, 8, 5, 13, 12, 8, 9, 4, 4, 5, 6, 8, 4, 3, 7, 3, 4, 6, 5, 2, 5, 2, 6, 5, 3, 5, 4, 9, 8, 4, 5, 7, 2, 3, 5, 7, 4, 3, 5, 5, 4, 5, 4, 2, 5, 8, 7, 3, 4, 2, 4, 4, 2, 1, 1, 1, 1, 1])


In [22]:
bins = list(movie_ages.keys()) # dict_keys-->list
print(type(bins),bins)
plt.hist(values, bins=bins, color='lightblue', normed=True)
fig = plt.gcf()
fig.set_size_inches(16,10)

<class 'list'> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 72, 76]


The 'normed' kwarg was deprecated in Matplotlib 2.1 and will be removed in 3.1. Use 'density' instead.
  alternative="'density'", removal="3.1")


In [23]:
rating_data_raw = sc.textFile("C:/input/spark/ml-100k/u.data")
print(type(rating_data_raw))
print(rating_data_raw.first())

<class 'pyspark.rdd.RDD'>
196	242	3	881250949


In [24]:
num_ratings = rating_data_raw.count()
print("num_ratings: %d" %num_ratings)

num_ratings: 100000


In [25]:
rating_data = rating_data_raw.map(lambda line:line.split("\t"))
ratings = rating_data.map(lambda fields:int(fields[2]))
print(type(ratings),ratings.take(5))
max_rating = ratings.reduce(lambda x,y:max(x,y))
min_rating = ratings.reduce(lambda x,y:min(x,y))
mean_rating = ratings.reduce(lambda x,y:x+y) / num_ratings #均值
median_rating = np.median(ratings.collect()) # numpy求中位数
print(type(ratings.collect()),ratings.collect()[:5],median_rating,)
ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies

<class 'pyspark.rdd.PipelinedRDD'> [3, 3, 1, 2, 1]
<class 'list'> [3, 3, 1, 2, 1] 4.0


In [26]:
print ("Average rating: %2.2f" % mean_rating)

Average rating: 3.53


In [27]:
ratings.stats()

(count: 100000, mean: 3.529859999999947, stdev: 1.1256679707622548, max: 5.0, min: 1.0)

In [28]:
count_by_rating = ratings.countByValue()
x_axis = list(count_by_rating.keys())
y_axis = np.array([ float(c) for c in count_by_rating.values()]) # 为了方便后学的y值求和，
y_axis2 = np.array(count_by_rating.values()) # 数据类型外面相等，里面不相等
print(type(x_axis),type(y_axis),len(y_axis),type(y_axis2))
print(y_axis,y_axis2)

<class 'list'> <class 'numpy.ndarray'> 5 <class 'numpy.ndarray'>
[ 6110. 11370. 27145. 34174. 21201.] dict_values([6110, 11370, 27145, 34174, 21201])


In [29]:
y_axis_normed = y_axis / y_axis.sum() # 对y轴正则化，使它表示百分比
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
plt.bar(pos, y_axis_normed, width, color='lightblue')
#plt.xticks(rotation=30)
fig = plt.gcf() # Get a reference to the current figure
fig.set_size_inches(16, 10)
#plt.show()

  "Adding an axes using the same arguments as a previous axes "


In [30]:
user_ratings_grouped = rating_data.map(lambda fields: (int(fields[0]),
int(fields[2]))).groupByKey()
user_ratings_grouped.take(5)

[(1, <pyspark.resultiterable.ResultIterable at 0x1721ea90>),
 (2, <pyspark.resultiterable.ResultIterable at 0x1721ef98>),
 (3, <pyspark.resultiterable.ResultIterable at 0x1721ed68>),
 (4, <pyspark.resultiterable.ResultIterable at 0x1721efd0>),
 (5, <pyspark.resultiterable.ResultIterable at 0x17221400>)]

In [31]:
user_ratings_byuser = user_ratings_grouped.map(lambda kv: (kv[0], len(kv[1]))) #新语法
user_ratings_byuser.take(5)

[(1, 272), (2, 62), (3, 54), (4, 24), (5, 175)]

In [32]:
user_ratings_byuser_local = user_ratings_byuser.map(lambda kv:kv[1]).collect()
print(user_ratings_byuser_local[:5])
plt.hist(user_ratings_byuser_local,bins=200,color='lightblue',normed=True)
fig = plt.gcf() # gcf = Get a reference to Current Figure
fig.set_size_inches(16,10)
#plt.show()

[272, 62, 54, 24, 175]


The 'normed' kwarg was deprecated in Matplotlib 2.1 and will be removed in 3.1. Use 'density' instead.
  alternative="'density'", removal="3.1")


In [33]:
years_pre_processed = movie_fields.map(lambda fields:fields[2])\
.map(lambda x:convert_year(x)).collect()
years_pre_processed_array = np.array(years_pre_processed)
print(type(years_pre_processed_array), years_pre_processed_array[:5])
print([years_pre_processed_array != 1990][:5],np.sum([years_pre_processed_array != 1990]))

<class 'numpy.ndarray'> [1995 1995 1995 1995 1995]
[array([ True,  True,  True, ...,  True,  True,  True])] 1658


In [34]:
mean_year = np.mean(years_pre_processed_array[years_pre_processed_array != 1990])
median_year = np.median(years_pre_processed_array[years_pre_processed_array != 1990])
index_bad_data = np.where(years_pre_processed_array==1900)[0][0] # 等于的第一个index
print(type(index_bad_data),index_bad_data)
years_pre_processed_array[index_bad_data] = median_year

<class 'numpy.int64'> 266


In [35]:
print(np.where(years_pre_processed_array==1900),len(years_pre_processed_array))
print(years_pre_processed_array[266])

(array([], dtype=int64),) 1682
1995


In [36]:
all_occupations = user_fields.map(lambda fields:fields[3]).distinct().collect()
print(type(all_occupations))
all_occupations.sort()
print(all_occupations)

<class 'list'>
['administrator', 'artist', 'doctor', 'educator', 'engineer', 'entertainment', 'executive', 'healthcare', 'homemaker', 'lawyer', 'librarian', 'marketing', 'none', 'other', 'programmer', 'retired', 'salesman', 'scientist', 'student', 'technician', 'writer']


In [37]:
idx = 0
all_occupations_dict = {}
for o in all_occupations:
    all_occupations_dict[o] = idx
    idx += 1
print(type(all_occupations_dict))
print("Encoding of 'doctor:' %d" % all_occupations_dict['doctor'])

<class 'dict'>
Encoding of 'doctor:' 2


In [38]:
K = len(all_occupations_dict)
binary_x = np.zeros(K)
k_programmer = all_occupations_dict['programmer']
binary_x[k_programmer] = 1
print("Binrary feature vector: %s " %binary_x)

Binrary feature vector: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.] 


In [39]:
def extract_datetime(ts):
    import datetime
    return datetime.datetime.fromtimestamp(ts)

timestamps = rating_data.map(lambda fields:int(fields[3]))
hour_of_day = timestamps.map(lambda ts:extract_datetime(ts).hour)
print(timestamps,hour_of_day.take(5))

PythonRDD[60] at RDD at PythonRDD.scala:48 [7, 12, 23, 21, 21]


In [40]:
def assgin_tod(hr):
    times_of_day = {
        'morning' : range(7,12),
        'lunch' : range(12,14),
        'afternoon': range(14,18),
        'evening': range(18,23),
        'night': range(23,7)
    }
    for k,v in times_of_day.items():
        if hr in v:
            return k
      
times_of_day = {
        'morning' : range(7,12),
        'lunch' : range(12,14),
        'afternoon': range(14,18),
        'evening': range(18,23),
        'night': range(23,7)}


times_of_day = hour_of_day.map(lambda hr:assgin_tod(hr))
times_of_day.take(5)
#for k,v in times_of_day.items():
#    if 8 in v:
#        print(k)

['morning', 'lunch', None, 'evening', 'evening']

In [49]:
# 提取简单的文本特征
def extract_title(raw):
    import re
    #该表达式找寻括号之间的非单词（数字）
    # search扫描整个字符串并返回第一个成功的匹配。否者返回None
    grps = re.search("\((\w+)\)", raw) # 等价于'[A-Za-z0-9_]',寻找‘(字符)’
    if grps:
        return raw[:grps.start()].strip() # 只选取标题部分，并删除末尾的空白字符,start匹配位置的开始
    else:
        return raw

In [48]:
print(movie_fields.take(1))
raw_titles = movie_fields.map(lambda fields:fields[1])
for raw_title in raw_titles.take(5):
    print(extract_title(raw_title))

[['1', 'Toy Story (1995)', '01-Jan-1995', '', 'http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)', '0', '0', '0', '1', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0']]
Toy Story
GoldenEye
Four Rooms
Get Shorty
Copycat


In [76]:
movie_titles = raw_titles.map(lambda m: extract_title(m))
title_terms = movie_titles.map(lambda t: t.split(" ")) # 接下来两种方法
print(type(title_terms),title_terms.take(5))
# 更多处理细节，大小写转换，删除标点符合和特殊字符，删除停用词，词干提取

<class 'pyspark.rdd.PipelinedRDD'> [['Toy', 'Story'], ['GoldenEye'], ['Four', 'Rooms'], ['Get', 'Shorty'], ['Copycat']]


In [77]:
import re
#all_terms = title_terms.flatMap(lambda x: x).distinct().collect()
all_terms = title_terms.flatMap(lambda x: x).map(lambda x: re.sub('\W+','',x)).distinct().collect() #去掉特殊字符

#all_terms_cleaned = all_terms.map(lambda x: re.sub('\W+','',x)) # 匹配非字母字符，即匹配特殊字符，去掉特殊字符
print(type(all_terms))
print(all_terms[:5]) # 发现第一个元素是空值''
all_terms.remove(all_terms[0]) # 前面collect从rdd转换成list,现已出第一个元素''
print(all_terms[:5])

<class 'list'>
['', 'Butcher', 'Femme', 'Das', 'Just']
['Butcher', 'Femme', 'Das', 'Just', 'Indian']


In [75]:
# 创建一个新的字典保存词，并分配k之1序号
idx = 0
all_terms_dict = {} #字典
for term in all_terms:
    all_terms_dict[term] = idx #注意是【】
    idx += 1
print("Total number of terms: %d" % len(all_terms_dict))
print("Index of term 'Dead': %d" % all_terms_dict['Dead'])
print("Index of term 'Rooms': %d" % all_terms_dict['Rooms'])

Total number of terms: 2425
Index of term 'Dead': 2004
Index of term 'Rooms': 238


In [83]:
# 也可以通过spark的zipWithIndex函数更高效得到相同的结果，该函数以各值的RDD为输入，
# 对值进行合并以生成一个新的键值对RDD
all_terms_dict2 = title_terms.flatMap(lambda x: x).map(lambda x: re.sub('\W+','',x)).distinct().\
zipWithIndex().collectAsMap() # \换行，zipWithIndex()编码，
print(type(all_terms_dict2))
#print("Index of term 'Dead': %d" % all_terms_dict2['Dead'])
#print("Index of term 'Rooms': %d" % all_terms_dict2['Rooms'])

<class 'dict'>


In [96]:
# 创建一个函数，将词集合转换为一个scipy稀疏向量
def create_vector(terms,term_dict):
    from scipy import sparse as sp
    num_terms = len(term_dict)
    x = sp.csc_matrix((1,num_terms))
    for t in terms:
        if t in term_dict:
            idx = term_dict[t]
            x[0,idx] =1
    return x

all_terms_bcast = sc.broadcast(all_terms_dict)
print(type(title_terms),title_terms.take(5))
print(type(all_terms_bcast))
term_vectors = title_terms.map(lambda terms:create_vector(terms,all_terms_bcast.value)) # 广播变量.value变为dict
term_vectors.take(5) # 每个电影标题都被转换为一个稀疏向量，有几个词对应几个非零元素

<class 'pyspark.rdd.PipelinedRDD'> [['Toy', 'Story'], ['GoldenEye'], ['Four', 'Rooms'], ['Get', 'Shorty'], ['Copycat']]
<class 'pyspark.broadcast.Broadcast'>


[<1x2425 sparse matrix of type '<class 'numpy.float64'>'
 	with 2 stored elements in Compressed Sparse Column format>,
 <1x2425 sparse matrix of type '<class 'numpy.float64'>'
 	with 1 stored elements in Compressed Sparse Column format>,
 <1x2425 sparse matrix of type '<class 'numpy.float64'>'
 	with 2 stored elements in Compressed Sparse Column format>,
 <1x2425 sparse matrix of type '<class 'numpy.float64'>'
 	with 2 stored elements in Compressed Sparse Column format>,
 <1x2425 sparse matrix of type '<class 'numpy.float64'>'
 	with 1 stored elements in Compressed Sparse Column format>]

In [100]:
# 正则化特征(column)和正则化特征向量(row)
np.random.seed(43)
x = np.random.randn(10) # 随机正态分布10个数
norm_x_2 = np.linalg.norm(x) # 默认2范数
normalized_x = x / norm_x_2

print("x:\n %s" %x)
print("2-norm of x: %2.4f" % norm_x_2)
print("normalized x: \n %s " % normalized_x)
print("2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x)) # 手动正则化以后求范数，应该为1


x:
 [ 0.25739993 -0.90848143 -0.37850311 -0.5349156   0.85807335 -0.41300998
  0.49818858  2.01019925  1.26286154 -0.43921486]
2-norm of x: 2.8818
normalized x: 
 [ 0.08931963 -0.31524962 -0.13134331 -0.18561957  0.29775765 -0.14331745
  0.17287504  0.69755365  0.43822207 -0.15241073] 
2-Norm of normalized_x: 1.0000


In [108]:
# MLLib正则化特征
from pyspark.mllib.feature import Normalizer
normalizer = Normalizer() # 初始化Normalizer,其默认使用与之前相同的二阶范数
# Distribute a local Scala collection to form an RDD
vector = sc.parallelize([x]) # 输入为一个RDD（它包含numpy数值或MLlib向量）

normalized_x_mllib = normalizer.transform(vector).first().toArray() #取rdd第一个值再转换为numpy数组
print(type(normalized_x_mllib))
print("Normalized x MLlib:\n%s" % normalized_x_mllib)
print("2-Norm of normalized x MLlib:\n%s" % np.linalg.norm(normalized_x_mllib))

<class 'numpy.ndarray'>
Normalized x MLlib:
[ 0.08931963 -0.31524962 -0.13134331 -0.18561957  0.29775765 -0.14331745
  0.17287504  0.69755365  0.43822207 -0.15241073]
2-Norm of normalized x MLlib:
1.0


In [110]:
'''特征提取可借助的软件包有scikit-learn、gensim、scikit-image、matplotlib、Python
的NLTK、Java编写的OpenNLP以及用Scala编写的Breeze
和Chalk。实际上，Breeze自Spark 1.0开始就成为Spark的一部分了。
'''
# 如何导入、处理和清理数据，以及如何将原始数据转为特征向量以供模型训练的常见方法

'特征提取可借助的软件包有scikit-learn、gensim、scikit-image、matplotlib、Python\n的NLTK、Java编写的OpenNLP以及用Scala编写的Breeze\n和Chalk。实际上，Breeze自Spark 1.0开始就成为Spark的一部分了。\n'