In [59]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [60]:
rdd = sc.textFile("C:/Users/MANOJ/pyspark_projects/california_housing/data/housing_new.csv")

In [61]:
rdd.take(2)

[u'-122.23,37.88,41,880,129,322,126,8.3252,452600,NEAR BAY',
 u'-122.22,37.86,21,7099,1106,2401,1138,8.3014,358500,NEAR BAY']

In [62]:
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)

[[u'-122.23',
  u'37.88',
  u'41',
  u'880',
  u'129',
  u'322',
  u'126',
  u'8.3252',
  u'452600',
  u'NEAR BAY'],
 [u'-122.22',
  u'37.86',
  u'21',
  u'7099',
  u'1106',
  u'2401',
  u'1138',
  u'8.3014',
  u'358500',
  u'NEAR BAY']]

In [63]:
rdd.first()

[u'-122.23',
 u'37.88',
 u'41',
 u'880',
 u'129',
 u'322',
 u'126',
 u'8.3252',
 u'452600',
 u'NEAR BAY']

In [64]:
rdd.top(2)

[[u'-124.35',
  u'40.54',
  u'52',
  u'1820',
  u'300',
  u'806',
  u'270',
  u'3.0147',
  u'94600',
  u'NEAR OCEAN'],
 [u'-124.3',
  u'41.84',
  u'17',
  u'2677',
  u'531',
  u'1244',
  u'456',
  u'3.0313',
  u'103600',
  u'NEAR OCEAN']]

In [65]:
# Import the necessary modules 
from pyspark.sql import Row
from pyspark.sql import SparkSession

In [66]:
spark = SparkSession(sc)
hasattr(rdd, "toDF")

True

In [67]:
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [68]:
df.show()

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|       126|              41|   37.88|  -122.23|          452600|      8.3252|       322|          129|       880|
|      1138|              21|   37.86|  -122.22|          358500|      8.3014|      2401|         1106|      7099|
|       177|              52|   37.85|  -122.24|          352100|      7.2574|       496|          190|      1467|
|       219|              52|   37.85|  -122.25|          341300|      5.6431|       558|          235|      1274|
|       259|              52|   37.85|  -122.25|          342200|      3.8462|       565|          280|      1627|
|       193|              52|   37.85|  -122.25|          269700|      4.0368|  

In [69]:
df.printSchema() # or df.dtypes

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



In [70]:
from pyspark.sql.types import *

In [71]:
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [72]:
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



In [73]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [74]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [75]:
df.describe().show()

+-------+-----------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|          latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|             20640|              20640|             20640|             20640|             20640|            20433|             20640|
|   mean|499.5396802325581|28.639486434108527| 35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8705525375618|2635.7630813953488|
| stddev|382.3297528316112|12.585557612111627|2.1359523806029794| 2.0035317429329127|115395.61587441

In [76]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

In [77]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [78]:
# Re-order and select columns
df = df.select("medianHouseValue",
               "totalBedRooms",
               "population",
               "households",
               "medianIncome",
               "roomsPerHousehold",
               "populationPerHousehold",
               "bedroomsPerRoom")

In [79]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

In [80]:
# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

In [81]:
# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [82]:
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
|2.697|[213.0,413.0,193....|
|2.992|[489.0,1094.0,514...|
|2.414|[687.0,1157.0,647...|
|2.267|[665.0,1206.0,595...|
|2.611|[707.0,1551.0,714...|
|2.815|[434.0,910.0,402....|
|2.418|[752.0,1504.0,734...|
|2.135|[474.0,1098.0,468...|
|1.913|[191.0,345.0,174....|
|1.592|[626.0,1212.0,620...|
|  1.4|[283.0,697.0,264....|
|1.525|[347.0,793.0,331....|
|1.555|[293.0,648.0,303....|
|1.587|[455.0,990.0,419....|
|1.629|[298.0,690.0,275....|
+-----+--------------------+
only showing top 20 rows



In [83]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([nan, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, nan])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([nan, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, nan]))]

There is a problem here. 
the above output has nan values generated in the scaled denseVector
Hence, the further machine learning is failing because of the nan values
in the features set.

In [38]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [39]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [40]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(nan, 0.14999), (nan, 0.14999), (nan, 0.14999), (nan, 0.344), (nan, 0.398)]

In [41]:
# Coefficients for the model
linearModel.coefficients

DenseVector([nan, nan, nan, nan, nan, nan, nan])

In [84]:
# Intercept for the model
linearModel.intercept

nan

In [86]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

nan

In [87]:
# Get the R2
linearModel.summary.r2

nan

In [88]:
spark.stop()