In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext

In [2]:
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [3]:
rdd = sc.textFile('cal_housing.data')
header = sc.textFile('cal_housing.domain')

In [4]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [5]:
rdd = rdd.map(lambda x: x.split(','))

In [6]:
from pyspark.sql import Row
df = rdd.map(lambda line: Row(
    longitude=line[0],
    latitude=line[1],
    housingMedianAge=line[2],
    totalRooms=line[3],
    totalBedRooms=line[4],
    population=line[5],
    households=line[6],
    medianInconde=line[7],
    medianHouseValues=line[8]
)).toDF()

In [7]:
df.show()
df.printSchema()

+-----------+----------------+---------+-----------+-----------------+-------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValues|medianInconde| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+-----------------+-------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|    452600.000000|     8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|    358500.000000|     8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|    352100.000000|     7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|    341300.000000|     5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|    342200.000000|     3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|   

In [8]:
from pyspark.sql.types import *
def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValues', 'medianInconde', 'population', 'totalBedRooms', 'totalRooms']
df = convertColumn(df, columns, FloatType())

In [9]:
df.select('latitude', 'longitude').show(2)

+--------+---------+
|latitude|longitude|
+--------+---------+
|   37.88|  -122.23|
|   37.86|  -122.22|
+--------+---------+
only showing top 2 rows



In [10]:
df.groupBy('housingMedianAge').count().sort('housingMedianAge', ascending=False).count()

52

In [11]:
from pyspark.ml.linalg import DenseVector

# return a tuple of first column and all other columns
temp_data = df.rdd.map(lambda x:(x[0], DenseVector(x[1:])))

#construct back a new DataFrame
df2 = spark.createDataFrame(temp_data, ['label','features'])
df2.take(2)

[Row(label=126.0, features=DenseVector([41.0, 37.88, -122.23, 452600.0, 8.3252, 322.0, 129.0, 880.0])),
 Row(label=1138.0, features=DenseVector([21.0, 37.86, -122.22, 358500.0, 8.3014, 2401.0, 1106.0, 7099.0]))]

In [12]:
from pyspark.ml.feature import StandardScaler

s_scaler_model = StandardScaler(inputCol='features', outputCol='features_scaled')
scaler_fn = s_scaler_model.fit(df2)
scaled_df = scaler_fn.transform(df2)

scaled_df.take(2)

[Row(label=126.0, features=DenseVector([41.0, 37.88, -122.23, 452600.0, 8.3252, 322.0, 129.0, 880.0]), features_scaled=DenseVector([3.2577, 17.7345, -61.0073, 3.9222, 4.3821, 0.2843, 0.3062, 0.4034])),
 Row(label=1138.0, features=DenseVector([21.0, 37.86, -122.22, 358500.0, 8.3014, 2401.0, 1106.0, 7099.0]), features_scaled=DenseVector([1.6686, 17.7251, -61.0023, 3.1067, 4.3696, 2.1202, 2.6255, 3.254]))]

In [13]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=101)
type(train_data)

pyspark.sql.dataframe.DataFrame

In [14]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='label', maxIter=20)

linear_model = lr.fit(train_data)

In [15]:
linear_model.coefficients

DenseVector([0.1206, -13.2149, -16.8051, 0.0, 2.1029, 0.0714, 0.7434, -0.0051])

In [16]:
list(zip(df.columns[1:], linear_model.coefficients))

[('housingMedianAge', 0.12057810919641318),
 ('latitude', -13.21485465543936),
 ('longitude', -16.805052255210516),
 ('medianHouseValues', 3.8992309300770294e-05),
 ('medianInconde', 2.1028990537503134),
 ('population', 0.07141859916407044),
 ('totalBedRooms', 0.7434067774509054),
 ('totalRooms', -0.005136471458766924)]

In [17]:
linear_model.intercept

-1547.2771874535813

In [18]:
linear_model.summary.numInstances

16535

In [19]:
linear_model.summary.meanAbsoluteError * 100000

3433000.446150105

In [20]:
linear_model.summary.meanSquaredError

4390.706636419032

In [21]:
predicted = linear_model.transform(test_data)
test_predictions = predicted.select('prediction').rdd.map(lambda x:x[0])
test_labels = predicted.select('label').rdd.map(lambda x:x[0])

test_predictions_labels = test_predictions.zip(test_labels)
test_predictions_labels_df = spark.createDataFrame(test_predictions_labels,['predictions','labels'])

test_predictions_labels_df.take(2)

[Row(predictions=42.61513936490451, labels=3.0),
 Row(predictions=37.754131028503934, labels=4.0)]

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator

linear_reg_eval = RegressionEvaluator(predictionCol='predictions', labelCol='labels')
linear_reg_eval.evaluate(test_predictions_labels_df)

56.06622153543286

In [23]:
prediction_mae = linear_reg_eval.evaluate(test_predictions_labels_df, 
                                          {linear_reg_eval.metricName:'mae'}) * 100000
prediction_mae

3296712.375476873

In [24]:
prediction_rmse = linear_reg_eval.evaluate(test_predictions_labels_df, 
                                           {linear_reg_eval.metricName:'rmse'}) * 100000
prediction_rmse

5606622.153543286

In [25]:
print('(training error, prediction error)')
print((linear_model.summary.rootMeanSquaredError * 100000, prediction_rmse))
print((linear_model.summary.meanAbsoluteError * 100000, prediction_mae))

(training error, prediction error)
(6626240.741490632, 5606622.153543286)
(3433000.446150105, 3296712.375476873)
