In [2]:
#import findspark
#findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Beginner")\
.config("spark.executor.memory", "1gb").getOrCreate()

sc = spark.sparkContext

In [4]:
rdd = sc.textFile('cal_housing.data')
header = sc.textFile('cal_housing.domain')

In [5]:
header.take(10)

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [6]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [7]:
rdd = rdd.map(lambda line: line.split(','))
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

In [8]:
from pyspark.sql import Row

df = rdd.map(lambda line: Row(longitude=line[0],
                              latitude=line[1],
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedrooms=line[4],
                              population=line[5],
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedrooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [9]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[households: string, housingMedianAge: string, latitude: string, longitude: string, medianHouseValue: string, medianIncome: string, population: string, totalBedrooms: string, totalRooms: string]>

In [10]:
from pyspark.sql.types import *

def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df

columns = ["households","housingMedianAge","latitude","longitude","medianHouseValue",\
           "medianIncome","population","totalBedrooms","totalRooms"]
df = convertColumn(df, columns, FloatType())
df.printSchema

<bound method DataFrame.printSchema of DataFrame[households: float, housingMedianAge: float, latitude: float, longitude: float, medianHouseValue: float, medianIncome: float, population: float, totalBedrooms: float, totalRooms: float]>

In [11]:
df.select("population","totalBedrooms").show(10)

+----------+-------------+
|population|totalBedrooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [12]:
df.groupby("housingMedianAge").count().sort("housingMedianAge", ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [13]:
from pyspark.sql.functions import *

df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
df.show(2)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedrooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
only showing top 2 rows



In [14]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households"))\
       .withColumn("populationPerHousehold", col("population")/col("households"))\
       .withColumn("bedroomsPerRoom", col("totalBedrooms")/col("totalRooms"))
        
df.show(2)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+-----------------+----------------------+-------------------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedrooms|totalRooms|roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+-----------------+----------------------+-------------------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|6.984126984126984|    2.5555555555555554|0.14659090909090908|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|6.238137082601054|     2.109841827768014|0.15579659106916466|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+------

In [15]:
df = df.select("medianHouseValue",
               "totalBedrooms",
               "population",
               "households",
               "medianIncome",
               "roomsPerHousehold",
               "populationPerHousehold",
               "bedroomsPerRoom")

In [16]:
from pyspark.ml.linalg import DenseVector

input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

df = spark.createDataFrame(input_data, ["label","features"])

In [17]:
from pyspark.ml.feature import StandardScaler

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df)
scaled_df = scaler.transform(df)
scaled_df.show(2)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.30623297630686...|
|3.585|[1106.0,2401.0,11...|[2.62553233949916...|
+-----+--------------------+--------------------+
only showing top 2 rows



In [18]:
train, test = scaled_df.randomSplit([0.8, 0.2], seed=0)

In [19]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features_scaled", labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train)

In [26]:
# Generate predictions
predicted = linearModel.transform(test)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.1522757551116882, 0.14999),
 (2.160528800818174, 0.14999),
 (1.7419763854885846, 0.225),
 (1.2985843641812973, 0.342),
 (1.5960813469915827, 0.344)]

In [29]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.5238, 0.0, 0.0, 0.0])

In [30]:
# Intercept for the model
linearModel.intercept

1.0045059772834877

In [31]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

0.8847839158170226

In [32]:
# Get the R2
linearModel.summary.r2

0.4145890929023409

In [33]:
spark.stop()