##  動機 

<br>這學期因為上資科系“數據科學與大數據分析”這門課而接觸到了Apache Spark，Spark原本使用的語言是Scala，也有Python的API叫PySpark，因為一直以來都是在使用Python作為分析的語言，就決定用Pyspark，在implement分析的過程中遇到了一點困難，因為Spark網站的tutorial對我來說有一點不直觀，然後也沒發現太多用PySpark做的分析，所以想說自己寫一個完整的PySpark分析流程。 </br>
<br></br>
<br>系統：Window 7 64bit, Python2.7, Anaconda2 2.4</br>
<br>Spark安裝參考：http://dataxscience.blogspot.tw/2016/02/setup-apache-spark-on-windows.html </br>
<br>Reference Code：http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/</br>

In [1]:
#在Notebook上把spark import進來
import os 
import sys

os.environ['SPARK_HOME']="C:/spark-1.6.1/" #看spark在你電腦的path是在哪裡

sys.path.append("C:/spark-1.6.1/bin") 
sys.path.append("C:/spark-1.6.1/python")
sys.path.append("C:/spark-1.6.1/python/pyspark/")
sys.path.append("C:/spark-1.6.1/python/lib") 
sys.path.append("C:/spark-1.6.1/python/lib/pyspark.zip")

import pyspark
sc = pyspark.SparkContext('local[*]') 
sqlContext = pyspark.sql.SQLContext(sc) #DataFrame需要用到的SQL context

## 如何在Spark把csv檔用DataFrame的方式呈現?

為了要把csv的資料在Spark用DataFrame的方式實現，需要用到pyspark裡面的sql函數套件

In [2]:
from pyspark.sql.types import *
from pyspark.sql import Row

在Spark裡面的資料格式是RDD(Resillient Distributed Dataset)

In [3]:
rdd = sc.textFile('realestate.csv')

用"take"來呈現前5行row

In [4]:
rdd.take(5)

[u'street,city,zip,state,beds,baths,sq__ft,type,sale_date,price,latitude,longitude',
 u'3526 HIGH ST,SACRAMENTO,95838,CA,2,1,836,Residential,Wed May 21 00:00:00 EDT 2008,59222,38.631913,-121.434879',
 u'51 OMAHA CT,SACRAMENTO,95823,CA,3,1,1167,Residential,Wed May 21 00:00:00 EDT 2008,68212,38.478902,-121.431028',
 u'2796 BRANCH ST,SACRAMENTO,95815,CA,2,1,796,Residential,Wed May 21 00:00:00 EDT 2008,68880,38.618305,-121.443839',
 u'2805 JANETTE WAY,SACRAMENTO,95815,CA,2,1,852,Residential,Wed May 21 00:00:00 EDT 2008,69307,38.616835,-121.439146']

從上面的結果可以看到其實每一row的資料是還沒有用","區隔開的，我們可以針對每一row做"split"

In [5]:
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)

[[u'street',
  u'city',
  u'zip',
  u'state',
  u'beds',
  u'baths',
  u'sq__ft',
  u'type',
  u'sale_date',
  u'price',
  u'latitude',
  u'longitude'],
 [u'3526 HIGH ST',
  u'SACRAMENTO',
  u'95838',
  u'CA',
  u'2',
  u'1',
  u'836',
  u'Residential',
  u'Wed May 21 00:00:00 EDT 2008',
  u'59222',
  u'38.631913',
  u'-121.434879']]

據我所知，目前好像沒有方法可以讓Spark看懂標題，所以我們先用"filter"把標題移除

In [6]:
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)

<br>接下來就是把Spark的RDD轉換成DataFrame的方法咯!</br>
<br>轉換成DataFrame的好處是我們可以選取我們有興趣的列，當然如果你有很多的features那可能就沒有太多的差別</br>

In [7]:
df = rdd.map(lambda line: Row(street = line[0],
                              city = line[1],
                              zip = line[2],
                              beds = float(line[4]),
                              baths = float(line[5]),
                              sqft = float(line[6]),
                              price = float(line[9]))).toDF()

雖然DataFrame也可以用"take"的方式來觀察資料，但其實可讀性並不高，在這裡我們用"show"

In [8]:
df.show(5)

+-----+----+----------+-------+------+----------------+-----+
|baths|beds|      city|  price|  sqft|          street|  zip|
+-----+----+----------+-------+------+----------------+-----+
|  1.0| 2.0|SACRAMENTO|59222.0| 836.0|    3526 HIGH ST|95838|
|  1.0| 3.0|SACRAMENTO|68212.0|1167.0|     51 OMAHA CT|95823|
|  1.0| 2.0|SACRAMENTO|68880.0| 796.0|  2796 BRANCH ST|95815|
|  1.0| 2.0|SACRAMENTO|69307.0| 852.0|2805 JANETTE WAY|95815|
|  1.0| 2.0|SACRAMENTO|81900.0| 797.0| 6001 MCMAHON DR|95824|
+-----+----+----------+-------+------+----------------+-----+
only showing top 5 rows



## Pandas & DataFrame Operation

現在就可以開始用DataFrame中的操作方式咯!

In [9]:
df.toPandas().head()

Unnamed: 0,baths,beds,city,price,sqft,street,zip
0,1.0,2.0,SACRAMENTO,59222.0,836.0,3526 HIGH ST,95838
1,1.0,3.0,SACRAMENTO,68212.0,1167.0,51 OMAHA CT,95823
2,1.0,2.0,SACRAMENTO,68880.0,796.0,2796 BRANCH ST,95815
3,1.0,2.0,SACRAMENTO,69307.0,852.0,2805 JANETTE WAY,95815
4,1.0,2.0,SACRAMENTO,81900.0,797.0,6001 MCMAHON DR,95824


只抓取zip是95815的資料

In [10]:
favorite_zip = df[df.zip == 95815]

In [11]:
favorite_zip.show(5)

+-----+----+----------+--------+------+----------------+-----+
|baths|beds|      city|   price|  sqft|          street|  zip|
+-----+----+----------+--------+------+----------------+-----+
|  1.0| 2.0|SACRAMENTO| 68880.0| 796.0|  2796 BRANCH ST|95815|
|  1.0| 2.0|SACRAMENTO| 69307.0| 852.0|2805 JANETTE WAY|95815|
|  1.0| 1.0|SACRAMENTO|106852.0| 871.0| 2930 LA ROSA RD|95815|
|  1.0| 2.0|SACRAMENTO| 78000.0| 800.0|    3132 CLAY ST|95815|
|  2.0| 4.0|SACRAMENTO| 89000.0|1316.0| 483 ARCADE BLVD|95815|
+-----+----+----------+--------+------+----------------+-----+
only showing top 5 rows



只看city和beds這兩個features

In [12]:
df.select('city','beds').show(10)

+--------------+----+
|          city|beds|
+--------------+----+
|    SACRAMENTO| 2.0|
|    SACRAMENTO| 3.0|
|    SACRAMENTO| 2.0|
|    SACRAMENTO| 2.0|
|    SACRAMENTO| 2.0|
|    SACRAMENTO| 3.0|
|    SACRAMENTO| 3.0|
|    SACRAMENTO| 3.0|
|RANCHO CORDOVA| 2.0|
|     RIO LINDA| 3.0|
+--------------+----+
only showing top 10 rows



針對beds做加總的groupby

In [13]:
df.groupBy("beds").count().show()

+----+-----+
|beds|count|
+----+-----+
| 1.0|   10|
| 6.0|    3|
| 3.0|  413|
| 5.0|   59|
| 8.0|    1|
| 4.0|  258|
| 0.0|  108|
| 2.0|  133|
+----+-----+



<br>接下來是pandas裡面一個用來很好觀察資料的函數：describe</br>
<br>可以看到有一些不好的資料，例如sqft有出現0，感覺上一間房子sqft為0是有點怪怪的</br>

In [14]:
df.describe(['baths','beds','price','sqft']).show()

+-------+------------------+------------------+------------------+------------------+
|summary|             baths|              beds|             price|              sqft|
+-------+------------------+------------------+------------------+------------------+
|  count|               985|               985|               985|               985|
|   mean|1.7766497461928934|2.9116751269035532|234144.26395939087|1314.9167512690356|
| stddev| 0.895371422318646|1.3079322320435811|  138365.839084928| 853.0482425034447|
|    min|               0.0|               0.0|            1551.0|               0.0|
|    max|               5.0|               8.0|          884790.0|            5822.0|
+-------+------------------+------------------+------------------+------------------+



## Regression with MLlib

Spark裡面Machine Learning的套件叫MLlib，我們現在以linear regression為例子，首先import幾個必要的套件

In [15]:
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *

首先，我先創造一個我有興趣的features的dataframe，接下來我將用baths,beds.sqft這三個features去預測房價price

In [16]:
data = df.select('price','baths','beds','sqft')

接下來把一些不合理的數據刪除

In [17]:
data = data[data.baths > 0]
data = data[data.beds > 0]
data = data[data.sqft > 0]
data.describe(['baths','beds','price','sqft']).show()

+-------+------------------+------------------+------------------+------------------+
|summary|             baths|              beds|             price|              sqft|
+-------+------------------+------------------+------------------+------------------+
|  count|               814|               814|               814|               814|
|   mean|1.9606879606879606|3.2444717444717446| 229448.3697788698|1591.1461916461917|
| stddev|0.6698038253879438|0.8521372615281976|119825.57606009026| 663.8419297942894|
|    min|               1.0|               1.0|            2000.0|             484.0|
|    max|               5.0|               8.0|          884790.0|            5822.0|
+-------+------------------+------------------+------------------+------------------+



### Labeled Points and Scaling Data

在MLlib裡面我們需要把features存成LabeledPoints，在這裡的意思是把每一個price和其對映的features組成一個向量。我們用map來做這件事情

In [18]:
temp = data.map(lambda line: LabeledPoint(line[0],[line[1:]]))
temp.take(5)

[LabeledPoint(59222.0, [1.0,2.0,836.0]),
 LabeledPoint(68212.0, [1.0,3.0,1167.0]),
 LabeledPoint(68880.0, [1.0,2.0,796.0]),
 LabeledPoint(69307.0, [1.0,2.0,852.0]),
 LabeledPoint(81900.0, [1.0,2.0,797.0])]

現在我們有了MLlib要求的資料格式，已經可以開始建模咯!但是在此之前我們先把資料標準化，因為可以看到sqft和baths,beds的數字相差很大

In [19]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

In [20]:
features = data.map(lambda row: row[1:])
features.take(5)

[(1.0, 2.0, 836.0),
 (1.0, 3.0, 1167.0),
 (1.0, 2.0, 796.0),
 (1.0, 2.0, 852.0),
 (1.0, 2.0, 797.0)]

In [21]:
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)

來看看標準化之後資料的樣子

In [22]:
features_transform.take(5)

[DenseVector([1.493, 2.347, 1.2593]),
 DenseVector([1.493, 3.5206, 1.7579]),
 DenseVector([1.493, 2.347, 1.1991]),
 DenseVector([1.493, 2.347, 1.2834]),
 DenseVector([1.493, 2.347, 1.2006])]

### 把labels(price)和標準化後的features結合，開始建模

In [23]:
lab = data.map(lambda row: row[0])
lab.take(5)

[59222.0, 68212.0, 68880.0, 69307.0, 81900.0]

用zip的方式結合

In [24]:
transformedData = lab.zip(features_transform)
transformedData.take(5)

[(59222.0, DenseVector([1.493, 2.347, 1.2593])),
 (68212.0, DenseVector([1.493, 3.5206, 1.7579])),
 (68880.0, DenseVector([1.493, 2.347, 1.1991])),
 (69307.0, DenseVector([1.493, 2.347, 1.2834])),
 (81900.0, DenseVector([1.493, 2.347, 1.2006]))]

In [25]:
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],row[1]))

In [26]:
transformedData.take(5)

[LabeledPoint(59222.0, [1.49297445326,2.34703972035,1.25933593899]),
 LabeledPoint(68212.0, [1.49297445326,3.52055958053,1.7579486134]),
 LabeledPoint(68880.0, [1.49297445326,2.34703972035,1.19908063091]),
 LabeledPoint(69307.0, [1.49297445326,2.34703972035,1.28343806223]),
 LabeledPoint(81900.0, [1.49297445326,2.34703972035,1.20058701361])]

MLlib可以很簡單地把資料切割成testing/training的data

In [27]:
trainingData,testingData = transformedData.randomSplit([.8,.2],seed=1234)

接下來選擇Stochastic Gradient Descent減少cost function的linear regression

In [28]:
from pyspark.mllib.regression import LinearRegressionWithSGD
linearModel = LinearRegressionWithSGD.train(trainingData,1000,.2)

In [29]:
trainingData.take(5)

[LabeledPoint(59222.0, [1.49297445326,2.34703972035,1.25933593899]),
 LabeledPoint(68212.0, [1.49297445326,3.52055958053,1.7579486134]),
 LabeledPoint(68880.0, [1.49297445326,2.34703972035,1.19908063091]),
 LabeledPoint(69307.0, [1.49297445326,2.34703972035,1.28343806223]),
 LabeledPoint(81900.0, [1.49297445326,2.34703972035,1.20058701361])]

用weights和intercept就可以看到截距和每個features各自的權重

In [30]:
linearModel.weights,linearModel.intercept

(DenseVector([15098.627, 3792.023, 70216.8097]), 0.0)

In [31]:
testingData.take(10)

[LabeledPoint(100309.0, [2.98594890652,3.52055958053,1.36930187625]),
 LabeledPoint(124100.0, [2.98594890652,3.52055958053,2.41171870613]),
 LabeledPoint(148750.0, [2.98594890652,4.69407944071,2.21739533756]),
 LabeledPoint(150000.0, [1.49297445326,1.17351986018,1.14485085363]),
 LabeledPoint(161500.0, [2.98594890652,4.69407944071,2.3906293483]),
 LabeledPoint(166357.0, [1.49297445326,4.69407944071,2.94497818269]),
 LabeledPoint(168000.0, [2.98594890652,3.52055958053,2.22492725107]),
 LabeledPoint(178480.0, [2.98594890652,3.52055958053,1.78506350204]),
 LabeledPoint(181872.0, [1.49297445326,3.52055958053,1.73535287287]),
 LabeledPoint(182587.0, [4.47892335978,4.69407944071,2.78831438167])]

predict就是用來預測的啦!

In [32]:
linearModel.predict([2.98594890652,3.52055958053,1.36930187625])

154581.78116003965

## 檢查Model

分類器建好之後接下來就是要來評估分類器的表現咯，這裡我們用RegressionMetrics來實現

In [33]:
from pyspark.mllib.evaluation import RegressionMetrics

把predict的結果和真正的觀察值放在一起準備餵給RegressionMetrics

In [34]:
prediObserRDDin = trainingData.map(lambda row: (float(linearModel.predict(row.features)),row.label))

In [35]:
metrics = RegressionMetrics(prediObserRDDin)

呼叫這個分類器的$R^2$

In [36]:
metrics.r2

0.4969184679643588

In [37]:
prediObserRDDout = testingData.map(lambda row: (float(linearModel.predict(row.features)),row.label))
metrics = RegressionMetrics(prediObserRDDout)

In [39]:
metrics.rootMeanSquaredError

94895.10434498572