In [1]:
import findspark

# Initialize and provide path
findspark.init('/home/vipul/DataScience/spark-2.3.0')
# Or
# findspark.init()

In [2]:
#from pyspark import SparkContext, SparkConf
# Import SparkSession
from pyspark.sql import SparkSession

In [3]:
# Build the SparkSession
spark = SparkSession.builder.master("local")\
                    .appName("Linear Regression Model")\
                    .config("spark.executor.memory", "1gb")\
                    .getOrCreate()
sc = spark.sparkContext

In [4]:
# Loading the data
rdd = sc.textFile('file:///home/vipul/Projects/Git/PySpark/CaliforniaHousing/cal_housing.data')
# Load the headers
header = sc.textFile('file:///home/vipul/Projects/Git/PySpark/CaliforniaHousing/cal_housing.domain')

In [5]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [6]:
# Retrieve the number of rows from RDD
rdd.take(2)
# rdd.first()
# rdd.top(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [7]:
# Split lines on comma
rdd = rdd.map(lambda line: line.split(','))
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

RDDs can be use when you want to perform low-level transformations and actions on your unstructured data. This means that you don’t care about imposing a schema while processing or accessing the attributes by name or column.

In [8]:
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude = line[0],
                              latitude = line[1],
                              housingMedianAge = line[2],
                              totalRooms = line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [9]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [10]:
df.columns

['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

In [11]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



In [12]:
from pyspark.sql.types import *

# df = df.withColumn("longitude", df["longitude"].cast(FloatType()))\
#     .withColumn("latitude", df["latitude"].cast(FloatType()))\
#     .withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType()))\
#     .withColumn("totalRooms", df["totalRooms"].cast(FloatType()))\
#     .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType()))\
#     .withColumn("population", df["population"].cast(FloatType()))\
#     .withColumn("households", df["households"].cast(FloatType()))\
#     .withColumn("medianIncome", df["medianIncome"].cast(FloatType()))\
#     .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

# Custom Function to convert data type of Dataframe columns
def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df

columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue',\
           'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

df = convertColumn(df, columns, FloatType())

In [13]:
df.show(5)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
+----------+----------------+--------+---------+----------------+------------+--

In [14]:
df.select(['population', 'totalBedRooms']).show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [15]:
# Complex queries
df.groupBy("housingMedianAge").count().sort("housingMedianAge", ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [16]:
df.describe().show()

+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.

Looking at the minimum and maximum values of all the (numerical) attributes. It seem that multiple attributes have a wide range of values so need to normalize the dataset.

In [17]:
# Import all sql functions
from pyspark.sql.functions import *

# Adjust the value of 'medianHouseValue'
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

In [18]:
df.show(2)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
only showing top 2 rows



### Feature Engineering

<ul>
<li>Rooms per household which refers to the number of rooms in households per block group</li>
<li>Population per household, which basically gives you an indication of how many people live in households per block group.</li>
<li>Bedrooms per room which will give you an idea about how many rooms are bedrooms per block group;</li>
</ul>

In [19]:
# Rooms per household
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# population per household
populationPerHousehold = df.select(col("population")/col("households"))

# Bedrooms per room
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

In [20]:
# Add the new columns to dataframe
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [21]:
df.show(5)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+------------------+----------------------+-------------------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+------------------+----------------------+-------------------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|     177.0|            52.0|   37.85|  -122.24|           3.521|      7.2574|     496.0|        190.0|    1467.0| 

In [22]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [23]:
df.show(5)

+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|           3.521|        190.0|     496.0|     177.0|      7.2574| 8.288135593220339|    2.8022598870056497|0.12951601908657123|
|           3.413|        235.0|     558.0|     219.0|      5.6431|5.8173515981735155|     2.547945205479452|0.18445839874411302|
|           3.422|        280.0|     565.0|     259.0|      3.8462| 6.281853281853282|    

A dense vector is a local vector that is backed by a double array that represents its entry values. In other words, it's used to store arrays of values for use in PySpark.

In [24]:
from pyspark.ml.linalg import DenseVector

# Define the input data
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace df with new dataframe
df = spark.createDataFrame(input_data, ["label", "features"])

In [25]:
# Import StandardScaler
from pyspark.ml.feature import StandardScaler

# Initialize the standard scaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the dataframe to the scaler
scaler = standardScaler.fit(df)

# Transform the data in df with the scaler
scaled_df = scaler.transform(df)

scaled_df.show(2)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.30623297630686...|
|3.585|[1106.0,2401.0,11...|[2.62553233949916...|
+-----+--------------------+--------------------+
only showing top 2 rows



In [26]:
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

### Machine Learning Model

In [27]:
# Split the data into train and test datasets
train_data, test_data = scaled_df.randomSplit([.8, .2], seed=1234)

In [28]:
# Import Linear Regression
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

With your model in place, you can generate predictions for your test data: use the transform() method to predict the labels for your test_data. Then, you can use RDD operations to extract the predictions as well as the true labels from the DataFrame and zip these two values together in a list called predictionAndLabel.

In [29]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.1340115638008952, 0.14999),
 (1.4485018834650096, 0.14999),
 (1.5713396046425587, 0.14999),
 (1.7496542762527307, 0.283),
 (1.2438468929500472, 0.366)]

### Evaluating the model

In [30]:
# Coefficients for the model
print(linearModel.coefficients)

# Intercept for the model
print(linearModel.intercept)

[0.0,0.0,0.0,0.279621528927,0.0,0.0,0.0]
0.9841344205626824


In [31]:
# Get the RMSE (Root Mean Squared Error)
print(linearModel.summary.rootMeanSquaredError)

# Get the R2
print(linearModel.summary.r2)

0.8765335684459216
0.42282227755911483


In [32]:
spark.stop()