# 0.数据加载

初始化 Spark

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession # SparkSession 是Spark 2.0版本的新入口
spark = SparkSession.builder.master('local').getOrCreate()

### 格式化读取

定义schema

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

schema= StructType([
    StructField("review_id",StringType(),True),
    StructField("longitude",FloatType(),True),
    StructField("latitude",FloatType(),True),
    StructField("altitude",FloatType(),True),
    StructField("review_date",StringType(),True),
    StructField("temperature",StringType(),True),
    StructField("rating",FloatType(),True),
    StructField("user_id",StringType(),True),
    StructField("user_birthday",StringType(),True),
    StructField("user_nationality",StringType(),True),
    StructField("user_career",StringType(),True),
    StructField("user_income",FloatType(),True),
])

从hdfs加载数据，获得dataframe

注意设置 nullValue 标志

In [3]:
data = spark.read.csv(path="hdfs://localhost:9000/user/bdlab/lab1/data_all.txt.gz",header=None,nullValue='?',schema=schema,sep='|')
data.count()

4783614

In [4]:
data.take(2)

[Row(review_id='144552912', longitude=9.349848747253418, latitude=56.740875244140625, altitude=17.052772521972656, review_date='2011/06/27', temperature='18.5℃', rating=83.91000366210938, user_id='38267', user_birthday='1974-06-08', user_nationality='Switzerland', user_career='programmer', user_income=5042.0),
 Row(review_id='144552912', longitude=9.350188255310059, latitude=56.74068069458008, altitude=17.614839553833008, review_date='2016-10-08', temperature='37.8℉', rating=78.80000305175781, user_id='1205', user_birthday='1991-04-14', user_nationality='Italy', user_career='teacher', user_income=1705.0)]

In [58]:
data.groupBy('user_career').count().show()

+-----------+------+
|user_career| count|
+-----------+------+
|    teacher|607541|
|     writer|606584|
| programmer|600149|
|     farmer|600468|
| accountant|587367|
|     artist|590183|
|    Manager|595507|
|     doctor|595815|
+-----------+------+



In [14]:
607541+606584+600149+600468+587367+590183+595507+595815

4783614

### 转换RDD

In [5]:
data_rdd = data.rdd

In [6]:
data_rdd.take(2)

[Row(review_id='144552912', longitude=9.349848747253418, latitude=56.740875244140625, altitude=17.052772521972656, review_date='2011/06/27', temperature='18.5℃', rating=83.91000366210938, user_id='38267', user_birthday='1974-06-08', user_nationality='Switzerland', user_career='programmer', user_income=5042.0),
 Row(review_id='144552912', longitude=9.350188255310059, latitude=56.74068069458008, altitude=17.614839553833008, review_date='2016-10-08', temperature='37.8℉', rating=78.80000305175781, user_id='1205', user_birthday='1991-04-14', user_nationality='Italy', user_career='teacher', user_income=1705.0)]

# 1.分层抽样

映射为 pair(career,rows) 

In [7]:
# 按career分层
sample = data_rdd.map(lambda x : (x['user_career'],[x]))

In [8]:
sample.take(2)

[('programmer',
  [Row(review_id='144552912', longitude=9.349848747253418, latitude=56.740875244140625, altitude=17.052772521972656, review_date='2011/06/27', temperature='18.5℃', rating=83.91000366210938, user_id='38267', user_birthday='1974-06-08', user_nationality='Switzerland', user_career='programmer', user_income=5042.0)]),
 ('teacher',
  [Row(review_id='144552912', longitude=9.350188255310059, latitude=56.74068069458008, altitude=17.614839553833008, review_date='2016-10-08', temperature='37.8℉', rating=78.80000305175781, user_id='1205', user_birthday='1991-04-14', user_nationality='Italy', user_career='teacher', user_income=1705.0)])]

In [115]:
# dubug
# sampledf = spark.createDataFrame(sample.values()[0],schema=schema)

TypeError: 'PipelinedRDD' object does not support indexing

In [107]:
# sampledf.groupBy('user_career').count().show()

+-----------+------+
|user_career| count|
+-----------+------+
|    teacher|607541|
|     writer|606584|
| programmer|600149|
|     farmer|600468|
| accountant|587367|
|     artist|590183|
|    Manager|595507|
|     doctor|595815|
+-----------+------+



分层抽样 1%

In [9]:
# 分层函数
n = 1
def layer(x,y):
    """
    Row x , y  with same key.
    """
    global n
    n += 1 
    if n%100 is 1:
        return x+y
    return x      

In [10]:
sample_layer = sample.reduceByKey(layer)

In [12]:
sample_layer_result = sample_layer.collect()

In [13]:
for item in sample_layer_result:
    print(item[0],len(item[1]))

programmer 5906
teacher 6118
farmer 6039
doctor 5975
Manager 5911
accountant 5874
artist 5816
writer 6205


In [16]:
sample_layer_result[0][1][:3]

[Row(review_id='144552912', longitude=9.349848747253418, latitude=56.740875244140625, altitude=17.052772521972656, review_date='2011/06/27', temperature='18.5℃', rating=83.91000366210938, user_id='38267', user_birthday='1974-06-08', user_nationality='Switzerland', user_career='programmer', user_income=5042.0),
 Row(review_id='125830646', longitude=9.975799560546875, latitude=56.607295989990234, altitude=54.140926361083984, review_date='2011-08-04', temperature='52.3℉', rating=88.77999877929688, user_id='14661', user_birthday='1980-11-20', user_nationality='Italy', user_career='programmer', user_income=2128.0),
 Row(review_id='26218810', longitude=9.98774242401123, latitude=56.99723434448242, altitude=10.679211616516113, review_date='2010-02-27', temperature='-2.1℃', rating=81.68000030517578, user_id='9308', user_birthday='1986/11/09', user_nationality='Austria', user_career='programmer', user_income=2681.0)]

#### flat映射： 划分value至多行，格式化str

In [82]:
sample_layer = sample_layer.flatMap(lambda x:x[1][:])

In [77]:
def str_form(item):
    return "|".join([str(x) for x in item[:]])

In [78]:
# 格式化用于输出
sample_result = sample_layer.map(str_form)

# 抽样样本保存到 hdfs
sample_result.saveAsTextFile("hdfs://localhost:9000/user/bdlab/lab1/data_sample.txt")

# 2.数据过滤

In [310]:
sample_layer.take(2)

[Row(review_id='144552912', longitude=9.349848747253418, latitude=56.740875244140625, altitude=17.052772521972656, review_date='2011/06/27', temperature='18.5℃', rating=83.91000366210938, user_id='38267', user_birthday='1974-06-08', user_nationality='Switzerland', user_career='programmer', user_income=5042.0),
 Row(review_id='125830646', longitude=9.975799560546875, latitude=56.607295989990234, altitude=54.140926361083984, review_date='2011-08-04', temperature='52.3℉', rating=88.77999877929688, user_id='14661', user_birthday='1980-11-20', user_nationality='Italy', user_career='programmer', user_income=2128.0)]

#### 定义上下界

获得rating上下界

In [308]:
# 获得 rating
rate_sample = sample_layer.map(lambda x:x['rating']).collect()
# 过滤缺失值
rate_sample = list(filter(lambda x: isinstance(x,float),rate_sample))
# 排序
rate_sample.sort()

In [309]:
rate_size = len(rate_sample)
rate_min = rate_sample[ int(rate_size*0.01) ]
rate_max = rate_sample[ int(rate_size*0.99)]
print(rate_min,rate_max)

59.63999938964844 95.94999694824219


longitude,latitude 界限

In [114]:
longitude_min = 8.1461259
longitude_max = 11.1993265
latitude_min = 56.5824856
latitude_max = 57.750511

### 过滤总体数据

In [126]:
def llr_filter(item):
    """
    filter item based value of longitude, latitude and rating
    """
    if item['longitude']<longitude_min or item['longitude']>longitude_max:
        return False
    if item['latitude']<latitude_min or item['latitude']>latitude_max:
        return False
    
    if not isinstance(item['rating'],float):
        # 缺失值保留,后续进行填充
        return True
    if item['rating']<rate_min or item['rating']>rate_max:
        return False
    
    return True

In [127]:
data_filtered = data_rdd.filter(llr_filter)

In [213]:
data_rdd.take(2)

[Row(review_id='144552912', longitude=9.349848747253418, latitude=56.740875244140625, altitude=17.052772521972656, review_date='2011/06/27', temperature='18.5℃', rating=83.91000366210938, user_id='38267', user_birthday='1974-06-08', user_nationality='Switzerland', user_career='programmer', user_income=5042.0),
 Row(review_id='144552912', longitude=9.350188255310059, latitude=56.74068069458008, altitude=17.614839553833008, review_date='2016-10-08', temperature='37.8℉', rating=78.80000305175781, user_id='1205', user_birthday='1991-04-14', user_nationality='Italy', user_career='teacher', user_income=1705.0)]

In [128]:
data_filtered.count()

4686718

In [132]:
# 格式化用于输出
data_filtered_result = data_filtered.map(str_form)

# 过滤结果保存到 hdfs
data_filtered_result.saveAsTextFile("hdfs://localhost:9000/user/bdlab/lab1/data_filtered.txt")

# 3.数据标准化和归一化
# 4.数据清洗



### 逻辑回归模型训练：缺失值预测

In [234]:
from sklearn.linear_model import Lasso
import pandas as pd
import numpy as np

In [214]:
model_rdd = sample_layer.map(lambda x:x)
model_rdd.take(2)

[Row(review_id='144552912', longitude=9.349848747253418, latitude=56.740875244140625, altitude=17.052772521972656, review_date='2011/06/27', temperature='18.5℃', rating=83.91000366210938, user_id='38267', user_birthday='1974-06-08', user_nationality='Switzerland', user_career='programmer', user_income=5042.0),
 Row(review_id='125830646', longitude=9.975799560546875, latitude=56.607295989990234, altitude=54.140926361083984, review_date='2011-08-04', temperature='52.3℉', rating=88.77999877929688, user_id='14661', user_birthday='1980-11-20', user_nationality='Italy', user_career='programmer', user_income=2128.0)]

In [215]:
# 过滤奇异值和缺失值，作为训练数据
def model_llr_filter(item):
    if item['longitude']<longitude_min or item['longitude']>longitude_max:
        return False
    if item['latitude']<latitude_min or item['latitude']>latitude_max:
        return False
    if not isinstance(item['rating'],float):
        return False
    if item['rating']<rate_min or item['rating']>rate_max:
        return False
    if not isinstance(item['user_income'],float):
        return False
    return True

转换DataFrame

In [216]:
model_rdd = model_rdd.filter(model_llr_filter)
model_data_df = spark.createDataFrame(model_rdd,schema=schema)
model_data_pd = model_data_df.toPandas()

rating 预测模型

In [297]:
rating_X = model_data_pd[['user_income','longitude','latitude','altitude']].values
rating_y = model_data_pd['rating'].values
rating_model = Lasso()
rating_model.fit(rating_X,rating_y)

Lasso(alpha=1.0, copy_X=True, fit_intercept=True, max_iter=1000,
   normalize=False, positive=False, precompute=False, random_state=None,
   selection='cyclic', tol=0.0001, warm_start=False)

income 预测模型

In [258]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import LabelEncoder

In [281]:
# career 编码
career_encoder = LabelEncoder()
model_data_pd['user_career'] = career_encoder.fit_transform(model_data_pd['user_career'])

# nationality 编码
nationality_encoder = LabelEncoder()
model_data_pd['user_nationality'] = nationality_encoder.fit_transform(model_data_pd['user_nationality'])

In [288]:
income_X = model_data_pd[['user_nationality','user_career']].values
income_y = model_data_pd['user_income'].values
income_model = RandomForestRegressor()
income_model.fit(income_X,income_y)

RandomForestRegressor(bootstrap=True, criterion='mse', max_depth=None,
           max_features='auto', max_leaf_nodes=None,
           min_impurity_decrease=0.0, min_impurity_split=None,
           min_samples_leaf=1, min_samples_split=2,
           min_weight_fraction_leaf=0.0, n_estimators=10, n_jobs=1,
           oob_score=False, random_state=None, verbose=0, warm_start=False)

In [295]:
def income_predict(nationality,career):
    nationality_code = nationality_encoder.transform([nationality])[0]
    career_code = career_encoder.transform([career])[0]
    income = income_model.predict([[nationality_code,career_code]])[0]
    return round(income,1)

In [296]:
income_predict('Italy','teacher')

1694.6

日期处理

In [134]:
from dateutil.parser import  parse

In [292]:
income_model.predict([[0,1]])[0]

2334.7477766105844

In [353]:
# 日期规范为 yyyy-mm-dd
def date_std(item):
    try:
        item['user_birthday'] = str(parse(item['user_birthday']).date())
        item['review_date'] = str(parse(item['review_date']).date())
    except ValueError:
        print(item)
    return item

# 归一化 rating
rating_range = rate_max - rate_min
def rating_nor(rating):
    if rating < rate_min:
        # 对于预测值
        return rate_min
    return (rating - rate_min) / rating_range

# 温度单位标准化，统一摄氏度
def temp_cen(temp):
    if '℃' in temp:
        return temp
    temp = float(temp[:-1])
    temp = round((temp-32)/1.8,1)
    return str(temp)+'℃'

def data_std_dict_mapper(item):
    # 规范日期
    item = date_std(item)
    
    # 统一温度
    item['temperature'] = temp_cen(item['temperature'])
    
    # income 缺失值填充
    if not isinstance(item['user_income'],float):
        item['user_income'] = income_predict(item['user_nationality'],item['user_career'])
    
    # rating 缺失值填充
    if not isinstance(item['rating'],float):
        item['rating'] = 0.0
        result = rating_model.predict([item['user_income'],[item['longitude'],item['latitude'],item['altitude']]])
        item['rating'] = rating_nor(result[0])
    else: # rating 归一化
        item['rating'] = rating_nor(item['rating'])
        
    return item

def data_std_mapper(item):
    item = item.asDict()
    return data_std_dict_mapper(item)

In [345]:
data_std = data_filtered.map(data_std_mapper)

In [346]:
data_std_result = data_std.map(lambda x:'|'.join([str(_) for _ in x.values()]))

In [347]:
data_std_result.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 129.0 failed 1 times, most recent failure: Lost task 0.0 in stage 129.0 (TID 82, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 350, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1053, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/pyspark/rdd.py", line 1053, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-344-58920eb1565f>", line 38, in data_std_mapper
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/linear_model/base.py", line 256, in predict
    return self._decision_function(X)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/linear_model/coordinate_descent.py", line 791, in _decision_function
    return super(ElasticNet, self)._decision_function(X)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/linear_model/base.py", line 239, in _decision_function
    X = check_array(X, accept_sparse=['csr', 'csc', 'coo'])
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/utils/validation.py", line 433, in check_array
    array = np.array(array, dtype=dtype, order=order, copy=copy)
ValueError: setting an array element with a sequence.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:471)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:454)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 350, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1053, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/pyspark/rdd.py", line 1053, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-344-58920eb1565f>", line 38, in data_std_mapper
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/linear_model/base.py", line 256, in predict
    return self._decision_function(X)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/linear_model/coordinate_descent.py", line 791, in _decision_function
    return super(ElasticNet, self)._decision_function(X)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/linear_model/base.py", line 239, in _decision_function
    X = check_array(X, accept_sparse=['csr', 'csc', 'coo'])
  File "/usr/local/anaconda3/lib/python3.6/site-packages/sklearn/utils/validation.py", line 433, in check_array
    array = np.array(array, dtype=dtype, order=order, copy=copy)
ValueError: setting an array element with a sequence.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:471)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:454)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
data_std_result.saveAsTextFile("hdfs://localhost:9000/user/bdlab/lab1/data_preprocessed.txt")

In [155]:
rowC = sample_layer_result[0][1][2]
rowC

Row(review_id='26218810', longitude=9.98774242401123, latitude=56.99723434448242, altitude=10.679211616516113, review_date='2010-02-27', temperature='-2.1℃', rating=81.68000030517578, user_id='9308', user_birthday='1986/11/09', user_nationality='Austria', user_career='programmer', user_income=2681.0)

In [156]:
rowF = sample_layer_result[0][1][3]
rowF

Row(review_id='133175086', longitude=8.591045379638672, latitude=57.119110107421875, altitude=2.2723560333251953, review_date='2015/02/16', temperature='31.9℉', rating=64.83000183105469, user_id='2795', user_birthday='1978/11/27', user_nationality='Germany', user_career='programmer', user_income=2921.0)

In [178]:
rowfdict['temperature'][:-1]

'31.9'

In [180]:
temp_cen('31.9℉')

'-0.1℃'

In [186]:
isinstance(None,float)

False

In [355]:
result = rating_model.predict([[rowfdict['user_income'],rowfdict['longitude'],rowfdict['latitude'],rowfdict['altitude']]])

In [356]:
result[0]

74.1958634479355

In [302]:
data_std_mapper(rowF)

{'review_id': '133175086',
 'longitude': 8.591045379638672,
 'latitude': 57.119110107421875,
 'altitude': 2.2723560333251953,
 'review_date': '2015-02-16',
 'temperature': '-0.1℃',
 'rating': 0.14293590719831636,
 'user_id': '2795',
 'user_birthday': '1978-11-27',
 'user_nationality': 'Germany',
 'user_career': 'programmer',
 'user_income': 2921.0}

In [371]:
row_test = rowF.asDict()

In [384]:
row_test

{'review_id': '133175086',
 'longitude': 8.591045379638672,
 'latitude': 57.119110107421875,
 'altitude': 2.2723560333251953,
 'review_date': '2015-02-16',
 'temperature': '-0.1℃',
 'rating': None,
 'user_id': '2795',
 'user_birthday': '1978-11-27',
 'user_nationality': 'Germany',
 'user_career': 'programmer',
 'user_income': 2921.0}

In [383]:
row_test['rating'] = None

In [387]:
test_mapper(row_test)

{'review_id': '133175086',
 'longitude': 8.591045379638672,
 'latitude': 57.119110107421875,
 'altitude': 2.2723560333251953,
 'review_date': '2015-02-16',
 'temperature': '-0.1℃',
 'rating': -1.6877589183364732,
 'user_id': '2795',
 'user_birthday': '1978-11-27',
 'user_nationality': 'Germany',
 'user_career': 'programmer',
 'user_income': 2921.0}

In [388]:
def test_mapper(item):
    # 规范日期
    item = date_std(item)
    
    # 统一温度
    item['temperature'] = temp_cen(item['temperature'])
    
    # income 缺失值填充
    if not isinstance(item['user_income'],float):
        print("fill income")
        item['user_income'] = income_predict(item['user_nationality'],item['user_career'])
    
    # rating 缺失值填充
    if not isinstance(item['rating'],float):
        print("fill rating")
        item['rating'] = 0.0
        result = rating_model.predict([item['user_income'],[item['longitude'],item['latitude'],item['altitude']]])
        item['rating'] = rating_nor(result[0])
        
    else: # rating 归一化
        item['rating'] = rating_nor(item['rating'])
        
    return item