## Following instruction from https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

In [2]:
# Load in the data
rdd = sc.textFile('E:/bitbucket_warehouse/spark/datasets/cal_housing.data')

# Load in the header
header = sc.textFile('E:/bitbucket_warehouse/spark/datasets/cal_housing.domain')

In [3]:
type(rdd)

pyspark.rdd.RDD

In [4]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [5]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [6]:
# Alternatively, you can also use the following functions to inspect your data:
# Inspect the first line 
rdd.first()

# Take top elements
rdd.top(2)

['-124.350000,40.540000,52.000000,1820.000000,300.000000,806.000000,270.000000,3.014700,94600.000000',
 '-124.300000,41.840000,17.000000,2677.000000,531.000000,1244.000000,456.000000,3.031300,103600.000000']

In [7]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

In [8]:
# Import the necessary modules
# See SparkSession from https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning
# and also https://stackoverflow.com/questions/32788387/pipelinedrdd-object-has-no-attribute-todf-in-pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession(sc)

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [9]:
# Show the top 5 rows 
df.show(5)

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
+-----------+----------------+--

In [10]:
# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



All columns are still of data type string. Need to correct them

In [11]:
from pyspark.sql.types import *

df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
   .withColumn("latitude", df["latitude"].cast(FloatType())) \
   .withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
   .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \
   .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \
   .withColumn("population", df["population"].cast(FloatType())) \
   .withColumn("households", df["households"].cast(FloatType())) \
   .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \
   .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

In [12]:
df.select('population','totalBedRooms').show(5)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
+----------+-------------+
only showing top 5 rows



In [13]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show(5)

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
+----------------+-----+
only showing top 5 rows



In [14]:
df.select('population','totalBedRooms').describe().show(5)

+-------+------------------+-----------------+
|summary|        population|    totalBedRooms|
+-------+------------------+-----------------+
|  count|             20640|            20640|
|   mean|1425.4767441860465|537.8980135658915|
| stddev|1132.4621217653385|421.2479059431315|
|    min|               3.0|              1.0|
|    max|           35682.0|           6445.0|
+-------+------------------+-----------------+



In [15]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue` to number in millions.
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [16]:
df.show(1)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+-----------------+----------------------+-------------------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+-----------------+----------------------+-------------------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|6.984126984126984|    2.5555555555555554|0.14659090909090908|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+-----------------+----------------------+-------------------+
only showing top 1 row



In [17]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

Separating the features from the target variable. In essence, this boils down to isolating the first column in your DataFrame from the rest of the columns.

In this case, you’ll use the map() function that you use with RDDs to perform this action. You also see that you make use of the DenseVector() function. A dense vector is a local vector that is backed by a double array that represents its entry values. In other words, it's used to store arrays of values for use in PySpark.

Next, you go back to making a DataFrame out of the input_data and you re-label the columns by passing a list as a second argument. This list consists of the column names "label" and "features":

In [18]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [35]:
df.head(5)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558])),
 Row(label=3.521, features=DenseVector([190.0, 496.0, 177.0, 7.2574, 8.2881, 2.8023, 0.1295])),
 Row(label=3.413, features=DenseVector([235.0, 558.0, 219.0, 5.6431, 5.8174, 2.5479, 0.1845])),
 Row(label=3.422, features=DenseVector([280.0, 565.0, 259.0, 3.8462, 6.2819, 2.1815, 0.1721]))]

In [25]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [26]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [27]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [28]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

In [30]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2762, 0.0, 0.0, 0.0])

In [31]:
# Intercept for the model
linearModel.intercept

0.9903995774620005

In [32]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

0.8692118678997669

In [33]:
# Get the R2
linearModel.summary.r2

0.4240895287218379

In [None]:
spark.stop()