In [10]:
import pyspark

In [13]:
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

In [15]:
sc = spark.sparkContext


In [26]:
rdd = sc.textFile('hdfs:///folder1/cal_housing.data')

In [27]:
rdd.collect()

[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 u'-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000',
 u'-122.250000,37.850000,52.000000,1274.000000,235.000000,558.000000,219.000000,5.643100,341300.000000',
 u'-122.250000,37.850000,52.000000,1627.000000,280.000000,565.000000,259.000000,3.846200,342200.000000',
 u'-122.250000,37.850000,52.000000,919.000000,213.000000,413.000000,193.000000,4.036800,269700.000000',
 u'-122.250000,37.840000,52.000000,2535.000000,489.000000,1094.000000,514.000000,3.659100,299200.000000',
 u'-122.250000,37.840000,52.000000,3104.000000,687.000000,1157.000000,647.000000,3.120000,241400.000000',
 u'-122.260000,37.840000,42.000000,2555.000000,665.000000,1206.000000,595.000000,2.080400,226700.000000',
 u'-122.250000,37.840000,52.000000,3549.000000,707.

In [29]:
rdd.take(2)

[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [30]:
rdd = rdd.map(lambda line: line.split(","))

In [31]:
rdd.take(2)

[[u'-122.230000',
  u'37.880000',
  u'41.000000',
  u'880.000000',
  u'129.000000',
  u'322.000000',
  u'126.000000',
  u'8.325200',
  u'452600.000000'],
 [u'-122.220000',
  u'37.860000',
  u'21.000000',
  u'7099.000000',
  u'1106.000000',
  u'2401.000000',
  u'1138.000000',
  u'8.301400',
  u'358500.000000']]

In [32]:
from pyspark.sql import Row

In [33]:
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [34]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [35]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



In [36]:
from pyspark.sql.types import *

In [38]:
df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
   .withColumn("latitude", df["latitude"].cast(FloatType())) \
   .withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
   .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \ 
   .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \ 
   .withColumn("population", df["population"].cast(FloatType())) \ 
   .withColumn("households", df["households"].cast(FloatType())) \ 
   .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \ 
   .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

SyntaxError: unexpected character after line continuation character (<ipython-input-38-ffe71a584877>, line 1)

In [39]:
from pyspark.sql.types import *

In [40]:
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

In [41]:
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']


In [42]:
df = convertColumn(df, columns, FloatType())

In [43]:
df.printSchema()


root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



In [44]:
#Now it is time for data exploration

In [45]:
df.select('population','totalBedRooms').show(10)


+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [46]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()


+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [47]:
df.describe().show()


+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|        households|  housingMedianAge|          latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             20640|             20640|             20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean| 499.5396802325581|28.639486434108527| 35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.32975283161136|12.585557612111613|2.1359523806029554|  2.003531742932892|115395.61

In [48]:
#now it is time for data preprocessing

In [49]:
from pyspark.sql.functions import *


In [50]:
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)


In [51]:
df.take(2)


[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

In [52]:
roomsPerHousehold = df.select(col("totalRooms")/col("households"))


In [53]:
populationPerHousehold = df.select(col("population")/col("households"))


In [54]:
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))


In [55]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [56]:
df.first()


Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [57]:
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [58]:
from pyspark.ml.linalg import DenseVector

In [59]:
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

In [60]:
df = spark.createDataFrame(input_data, ["label", "features"])


In [61]:
from pyspark.ml.feature import StandardScaler


In [62]:
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")


In [63]:
scaler = standardScaler.fit(df)


In [64]:
scaled_df = scaler.transform(df)


In [65]:
scaled_df.take(2)


[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [66]:
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)


In [67]:
from pyspark.ml.regression import LinearRegression


In [68]:
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)


In [69]:
linearModel = lr.fit(train_data)


In [70]:
predicted = linearModel.transform(test_data)


In [71]:
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

In [72]:
predictionAndLabel = predictions.zip(labels).collect()

In [73]:
predictionAndLabel[:5]


[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

In [74]:
#now we need to evaluate our model

In [75]:
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2762, 0.0, 0.0, 0.0])

DenseVector([0.0, 0.0, 0.0, 0.2762, 0.0, 0.0, 0.0])

In [76]:
linearModel.intercept

0.9903995774620005

0.9903995774620005

In [77]:
linearModel.summary.rootMeanSquaredError


0.8692118678997669

0.8692118678997669

In [78]:
linearModel.summary.r2

0.4240895287218379

0.4240895287218379

In [79]:
spark.stop()


In [10]:
import pyspark

In [13]:
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

In [15]:
sc = spark.sparkContext


In [26]:
rdd = sc.textFile('hdfs:///folder1/cal_housing.data')

In [27]:
rdd.collect()

[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 u'-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000',
 u'-122.250000,37.850000,52.000000,1274.000000,235.000000,558.000000,219.000000,5.643100,341300.000000',
 u'-122.250000,37.850000,52.000000,1627.000000,280.000000,565.000000,259.000000,3.846200,342200.000000',
 u'-122.250000,37.850000,52.000000,919.000000,213.000000,413.000000,193.000000,4.036800,269700.000000',
 u'-122.250000,37.840000,52.000000,2535.000000,489.000000,1094.000000,514.000000,3.659100,299200.000000',
 u'-122.250000,37.840000,52.000000,3104.000000,687.000000,1157.000000,647.000000,3.120000,241400.000000',
 u'-122.260000,37.840000,42.000000,2555.000000,665.000000,1206.000000,595.000000,2.080400,226700.000000',
 u'-122.250000,37.840000,52.000000,3549.000000,707.

In [29]:
rdd.take(2)

[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [30]:
rdd = rdd.map(lambda line: line.split(","))

In [31]:
rdd.take(2)

[[u'-122.230000',
  u'37.880000',
  u'41.000000',
  u'880.000000',
  u'129.000000',
  u'322.000000',
  u'126.000000',
  u'8.325200',
  u'452600.000000'],
 [u'-122.220000',
  u'37.860000',
  u'21.000000',
  u'7099.000000',
  u'1106.000000',
  u'2401.000000',
  u'1138.000000',
  u'8.301400',
  u'358500.000000']]

In [32]:
from pyspark.sql import Row

In [33]:
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [34]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [35]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



In [36]:
from pyspark.sql.types import *

In [38]:
df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
   .withColumn("latitude", df["latitude"].cast(FloatType())) \
   .withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
   .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \ 
   .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \ 
   .withColumn("population", df["population"].cast(FloatType())) \ 
   .withColumn("households", df["households"].cast(FloatType())) \ 
   .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \ 
   .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

SyntaxError: unexpected character after line continuation character (<ipython-input-38-ffe71a584877>, line 1)

In [39]:
from pyspark.sql.types import *

In [40]:
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

In [41]:
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']


In [42]:
df = convertColumn(df, columns, FloatType())

In [43]:
df.printSchema()


root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



In [44]:
#Now it is time for data exploration

In [45]:
df.select('population','totalBedRooms').show(10)


+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [46]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()


+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [47]:
df.describe().show()


+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|        households|  housingMedianAge|          latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             20640|             20640|             20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean| 499.5396802325581|28.639486434108527| 35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.32975283161136|12.585557612111613|2.1359523806029554|  2.003531742932892|115395.61

In [48]:
#now it is time for data preprocessing

In [49]:
from pyspark.sql.functions import *


In [50]:
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)


In [51]:
df.take(2)


[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

In [52]:
roomsPerHousehold = df.select(col("totalRooms")/col("households"))


In [53]:
populationPerHousehold = df.select(col("population")/col("households"))


In [54]:
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))


In [55]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [56]:
df.first()


Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [57]:
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [58]:
from pyspark.ml.linalg import DenseVector

In [59]:
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

In [60]:
df = spark.createDataFrame(input_data, ["label", "features"])


In [61]:
from pyspark.ml.feature import StandardScaler


In [62]:
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")


In [63]:
scaler = standardScaler.fit(df)


In [64]:
scaled_df = scaler.transform(df)


In [65]:
scaled_df.take(2)


[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [66]:
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)


In [67]:
from pyspark.ml.regression import LinearRegression


In [68]:
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)


In [69]:
linearModel = lr.fit(train_data)


In [70]:
predicted = linearModel.transform(test_data)


In [71]:
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

In [72]:
predictionAndLabel = predictions.zip(labels).collect()

In [73]:
predictionAndLabel[:5]


[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

In [74]:
#now we need to evaluate our model

In [75]:
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2762, 0.0, 0.0, 0.0])

DenseVector([0.0, 0.0, 0.0, 0.2762, 0.0, 0.0, 0.0])

In [76]:
linearModel.intercept

0.9903995774620005

0.9903995774620005

In [77]:
linearModel.summary.rootMeanSquaredError


0.8692118678997669

0.8692118678997669

In [78]:
linearModel.summary.r2

0.4240895287218379

0.4240895287218379

In [79]:
spark.stop()
