In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
   .master("<master-dns>:7077") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .config("spark.sql.execution.arrow.enabled", "true") \
   .getOrCreate()

Example has been adapted from [Apache Spark Tutorial: ML with PySpark](https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning). Instead of running on a single local node as in the example, we run it on a 4-node Spark cluster on AWS.

In [6]:
rdd_data = spark.sparkContext.textFile('data/cal_housing.data')

In [7]:
rdd_header = spark.sparkContext.textFile('data/cal_housing.domain')

In [8]:
rdd_header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [9]:
rdd_data.take(3)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 '-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000']

In [10]:
rdd_data = rdd_data.map(lambda line: line.split(','))

In [11]:
type(rdd_data)

pyspark.rdd.PipelinedRDD

In [12]:
rdd_data.take(3)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000'],
 ['-122.240000',
  '37.850000',
  '52.000000',
  '1467.000000',
  '190.000000',
  '496.000000',
  '177.000000',
  '7.257400',
  '352100.000000']]

In [13]:
from pyspark.sql import Row

In [14]:
df = rdd_data.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [15]:
type(df)

pyspark.sql.dataframe.DataFrame

In [16]:
df.show(3)

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
only showing top 3 rows



In [17]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



In [18]:
df.dtypes

[('households', 'string'),
 ('housingMedianAge', 'string'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('medianHouseValue', 'string'),
 ('medianIncome', 'string'),
 ('population', 'string'),
 ('totalBedRooms', 'string'),
 ('totalRooms', 'string')]

In [19]:
from pyspark.sql.types import *

In [20]:
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df

In [21]:
columns = df.columns
columns

['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

In [22]:
df = convertColumn(df, columns, FloatType())

In [23]:
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=452600.0, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0)

In [24]:
df.dtypes

[('households', 'float'),
 ('housingMedianAge', 'float'),
 ('latitude', 'float'),
 ('longitude', 'float'),
 ('medianHouseValue', 'float'),
 ('medianIncome', 'float'),
 ('population', 'float'),
 ('totalBedRooms', 'float'),
 ('totalRooms', 'float')]

In [25]:
df.select('population', 'totalBedrooms').show(5)

+----------+-------------+
|population|totalBedrooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
+----------+-------------+
only showing top 5 rows



In [26]:
df.groupBy('housingMedianAge').count().sort('housingMedianAge', ascending = False).show(5)

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
+----------------+-----+
only showing top 5 rows



In [27]:
median_age_count = df.groupBy('housingMedianAge').count().sort('housingMedianAge', ascending = False).toPandas()



In [28]:
type(median_age_count)

pandas.core.frame.DataFrame

In [29]:
median_age_count.head()

Unnamed: 0,housingMedianAge,count
0,52.0,1273
1,51.0,48
2,50.0,136
3,49.0,134
4,48.0,177


In [30]:
median_age_count.dtypes

housingMedianAge    float32
count                 int64
dtype: object

In [31]:
df.describe().show(5)

+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|        households|  housingMedianAge|          latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             20640|             20640|             20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean| 499.5396802325581|28.639486434108527| 35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.32975283161136|12.585557612111613|2.1359523806029554| 2.0035317429328914|115395.61

In [32]:
from pyspark.sql.functions import *

In [33]:
df = df.withColumn('medianHouseValue', col('medianHouseValue')/100000)

In [34]:
df.take(3)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0),
 Row(households=177.0, housingMedianAge=52.0, latitude=37.849998474121094, longitude=-122.23999786376953, medianHouseValue=3.521, medianIncome=7.257400035858154, population=496.0, totalBedRooms=190.0, totalRooms=1467.0)]

In [35]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
       .withColumn("populationPerHousehold", col("population")/col("households")) \
       .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [36]:
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [37]:
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [38]:
df.count()

20640

In [39]:
df_pd = df.toPandas()
type(df_pd)

pandas.core.frame.DataFrame

In [40]:
df_pd.head(5)

Unnamed: 0,medianHouseValue,totalBedRooms,population,households,medianIncome,roomsPerHousehold,populationPerHousehold,bedroomsPerRoom
0,4.526,129.0,322.0,126.0,8.3252,6.984127,2.555556,0.146591
1,3.585,1106.0,2401.0,1138.0,8.3014,6.238137,2.109842,0.155797
2,3.521,190.0,496.0,177.0,7.2574,8.288136,2.80226,0.129516
3,3.413,235.0,558.0,219.0,5.6431,5.817352,2.547945,0.184458
4,3.422,280.0,565.0,259.0,3.8462,6.281853,2.181467,0.172096


In [41]:
df.createOrReplaceTempView('medianstats')

In [42]:
sql_results = spark.sql("select * from medianstats where medianHouseValue > 3.0")

In [43]:
sql_results.show(5)

+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|           3.521|        190.0|     496.0|     177.0|      7.2574| 8.288135593220339|    2.8022598870056497|0.12951601908657123|
|           3.413|        235.0|     558.0|     219.0|      5.6431|5.8173515981735155|     2.547945205479452|0.18445839874411302|
|           3.422|        280.0|     565.0|     259.0|      3.8462| 6.281853281853282|    

In [44]:
from pyspark.ml.linalg import DenseVector

In [45]:
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

In [46]:
type(input_data)

pyspark.rdd.PipelinedRDD

In [47]:
df = spark.createDataFrame(input_data, ["label", "features"])

In [48]:
df.first()

Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]))

In [49]:
from pyspark.ml.feature import StandardScaler

In [50]:
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

In [51]:
scaler = standardScaler.fit(df)

In [52]:
type(scaler)

pyspark.ml.feature.StandardScalerModel

In [53]:
scaled_df = scaler.transform(df)

In [54]:
type(scaled_df)

pyspark.sql.dataframe.DataFrame

In [55]:
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [56]:
#split into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [57]:
type(train_data)

pyspark.sql.dataframe.DataFrame

In [58]:
from pyspark.ml.regression import LinearRegression

In [59]:
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [60]:
type(lr)

pyspark.ml.regression.LinearRegression

In [61]:
linearModel = lr.fit(train_data)

In [62]:
type(linearModel)

pyspark.ml.regression.LinearRegressionModel

In [63]:
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2762, 0.0, 0.0, 0.0])

In [64]:
# Generate predictions
predicted = linearModel.transform(test_data)

In [65]:
type(predicted)

pyspark.sql.dataframe.DataFrame

In [66]:
predicted.show(5)

+-------+--------------------+--------------------+------------------+
|  label|            features|     features_scaled|        prediction|
+-------+--------------------+--------------------+------------------+
|0.14999|[73.0,85.0,38.0,1...|[0.17329463000311...|1.4491508524918457|
|0.14999|[239.0,490.0,164....|[0.56736187083210...|1.5705029404692372|
|0.14999|[267.0,628.0,225....|[0.63383104398397...| 2.148727956912464|
|  0.344|[121.0,530.0,115....|[0.28724178397775...|1.5831547768979277|
|  0.398|[316.0,672.0,241....|[0.75015209699976...|1.5182107797955968|
+-------+--------------------+--------------------+------------------+
only showing top 5 rows



In [67]:
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

In [68]:
predictionAndLabel = predictions.zip(labels).collect()

In [69]:
type(predictionAndLabel)

list

In [70]:
predictionAndLabel[:5]

[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

In [71]:
linearModel.summary.rootMeanSquaredError

0.8692118678997669

In [72]:
linearModel.summary.r2

0.4240895287218379

In [73]:
spark.stop()