In [34]:
import findspark

findspark.init()

## 初始化

In [35]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression

spark = SparkSession.builder.appName('CaliforniaHousing').getOrCreate()

## 读入数据

In [36]:
df = spark.read.csv('data/CaliforniaHousing/cal_housing.data', inferSchema=True)
# 指定列名
df = df.select(df['_c0'].alias('longitude'),
               df['_c1'].alias('latitude'),
               df['_c2'].alias('housingMedianAge'),
               df['_c3'].alias('totalRooms'),
               df['_c4'].alias('totalBedrooms'),
               df['_c5'].alias('population'),
               df['_c6'].alias('households'),
               df['_c7'].alias('medianIncome'),
#                df['_c8'].alias('medianHouseValue'))
               df['_c8'].alias('label'))
df.printSchema()
df.show()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- totalRooms: double (nullable = true)
 |-- totalBedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- label: double (nullable = true)

+---------+--------+----------------+----------+-------------+----------+----------+------------+--------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|   label|
+---------+--------+----------------+----------+-------------+----------+----------+------------+--------+
|  -122.23|   37.88|            41.0|     880.0|        129.0|     322.0|     126.0|      8.3252|452600.0|
|  -122.22|   37.86|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|358500.0|
|  -122.24|   37.85|            52.0|    1467.0|        190.0|     496.0|     

In [37]:
vecAssembler = VectorAssembler(inputCols=['longitude', 'latitude', 'housingMedianAge', 'totalRooms',
                                         'totalBedrooms', 'population', 'households', 'medianIncome'],
                              outputCol='features')
assembledFeatures = vecAssembler.transform(df)

## 归一化

In [38]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scalerModel = scaler.fit(assembledFeatures)
scalerFeatures = scalerModel.transform(assembledFeatures)
scalerFeatures.printSchema()
scalerFeatures.select('scaledFeatures').show(truncate=False)

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- totalRooms: double (nullable = true)
 |-- totalBedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)

+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaledFeatures                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|[-61.00726959606955,17.734477624640412,3.25770230160830

## 模型训练

In [39]:
test_ratio = 0.3
seed = 1234

(training, testing) = scalerFeatures.randomSplit([1-test_ratio, test_ratio], seed)

lr = LinearRegression(featuresCol='scaledFeatures', maxIter=50, elasticNetParam=0.3)
lrModel = lr.fit(training)

In [40]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

Coefficients: [-85702.42573195054,-90909.01657601786,15266.561765805687,-20179.74492761189,50313.473987385085,-41776.56433598595,17236.123425992188,76861.0530568257]
Intercept: -3592296.7765801586


In [41]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 1
objectiveHistory: [0.0]
+-------------------+
|          residuals|
+-------------------+
| -76402.88101581251|
| -52635.47813156899|
| -78719.60023262398|
| -83597.33111470286|
| -134813.9760782197|
|  -1690.92039518198|
| 1484.9349848376587|
| -51557.13812966924|
|-246.08598265983164|
| -11226.79454551125|
| -75034.88186836056|
|-46577.261419413146|
| 22823.179023702163|
| -52556.66511212336|
| -56297.55669412343|
| -53588.49602598883|
|-65474.971853818744|
| -38593.04783859523|
|-34495.202090676874|
|  -68485.4323029751|
+-------------------+
only showing top 20 rows

RMSE: 69567.212590
r2: 0.637410


In [42]:
# Predict
predictions = lrModel.transform(testing)
predictions.show()

+---------+--------+----------------+----------+-------------+----------+----------+------------+--------+--------------------+--------------------+------------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|   label|            features|      scaledFeatures|        prediction|
+---------+--------+----------------+----------+-------------+----------+----------+------------+--------+--------------------+--------------------+------------------+
|  -124.35|   40.54|            52.0|    1820.0|        300.0|     806.0|     270.0|      3.0147| 94600.0|[-124.35,40.54,52...|[-62.065401082150...|187899.98507704632|
|   -124.3|    41.8|            19.0|    2672.0|        552.0|    1298.0|     478.0|      1.9797| 85800.0|[-124.3,41.8,19.0...|[-62.040445150874...|63676.044894865714|
|   -124.3|   41.84|            17.0|    2677.0|        531.0|    1244.0|     456.0|      3.0313|103600.0|[-124.3,41.84,17....|[-62.040445150874...|100537.89939

## 预测

In [43]:
predictions.select("label", "prediction").show()

+--------+------------------+
|   label|        prediction|
+--------+------------------+
| 94600.0|187899.98507704632|
| 85800.0|63676.044894865714|
|103600.0|100537.89939753897|
| 90100.0|165071.18914240086|
| 82800.0| 136628.9686223413|
| 81300.0|116096.58738278225|
| 62500.0| 133799.2687787623|
|109400.0|116653.65494084079|
| 81800.0| 202225.8945067469|
| 76900.0| 130049.3810931351|
| 74700.0|134607.58343426092|
| 80500.0|  150112.084720233|
| 75500.0| 97657.74982272321|
| 90000.0|   174952.18178041|
| 66800.0|  99932.9340747795|
|100500.0| 129374.3978123935|
| 86900.0|117023.70363311097|
| 74100.0|135611.16950535448|
| 96100.0|119535.75774119934|
| 73600.0|154176.73023142805|
+--------+------------------+
only showing top 20 rows



## Model Selection and Tuning

### Cross-Validation

In [44]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0.1, 0.3, 0.5]).build()

# lr_new = LinearRegression(featuresCol='scaledFeatures', labelCol='medianHouseValue')
crossval = CrossValidator(estimator=lr, 
                          estimatorParamMaps=paramGrid, 
                          evaluator=RegressionEvaluator(),
                          numFolds = 3)

cvModel = crossval.fit(training)

In [81]:
predictions = cvModel.transform(testing)

In [83]:
cvModel.bestModel

LinearRegression_4c208341dd4b6a3e5a95

In [73]:
predictions.select('label', 'prediction').show()

+--------+------------------+
|   label|        prediction|
+--------+------------------+
| 94600.0|187893.09831869928|
| 85800.0| 63670.38516491931|
|103600.0| 100529.5686042374|
| 90100.0|165089.38547603413|
| 82800.0|  136621.427507143|
| 81300.0| 116092.1182073555|
| 62500.0|133787.14099471178|
|109400.0|116653.18418797199|
| 81800.0| 202216.5218642773|
| 76900.0|130038.27805944253|
| 74700.0|134594.89249524195|
| 80500.0|150090.31018255372|
| 75500.0| 97648.99883046467|
| 90000.0|174943.57296009595|
| 66800.0|  99915.5713534141|
|100500.0| 129373.0695389607|
| 86900.0|117018.12521233037|
| 74100.0|135602.32725241221|
| 96100.0|119515.98077647155|
| 73600.0|154170.59967591567|
+--------+------------------+
only showing top 20 rows



In [80]:
from pyspark.sql.functions import sqrt, exp, sum

predictions1 = predictions.select(((predictions['label'] - predictions['prediction'])*(predictions['label'] - predictions['prediction'])).alias('mse'))
# predictions2 = predictions1.select(sqrt(predictions1['mse']))
predictions1.show()
predictions1.select((sqrt(sum(predictions1['mse'])/n)).alias('RMSE')).show(truncate=False)

+--------------------+
|                 mse|
+--------------------+
|  8.70360219390249E9|
| 4.897198527490234E8|
|   9427548.956084669|
| 5.623407934073239E9|
|2.8967460589066496E9|
|1.2104914893545978E9|
| 5.081856471199917E9|
| 5.260868086464685E7|
|1.450013873788997...|
| 2.823676595122631E9|
|  3.58739814701659E9|
| 4.842811271304039E9|
| 4.905781491919252E8|
| 7.215410587227143E9|
|1.0966410660630603E9|
| 8.336541446016601E8|
| 9.071014663056104E8|
| 3.782536257462806E9|
| 5.483081557240852E8|
|6.4916215321366625E9|
+--------------------+
only showing top 20 rows

+-----------------+
|RMSE             |
+-----------------+
|69429.55472130788|
+-----------------+

