In [32]:
# importing spark modules

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

# initializing SparkSession object as session
session = SparkSession.builder.appName("pipeline").master("local[*]").getOrCreate()

In [33]:
# Reading the data 
df = session.read.csv("Housing_data-Final-2-Copy1.csv",header= True, inferSchema = True,nullValue=0)

In [34]:
# printing the dataframe
df = df.withColumnRenamed("#Bedroom","Bedroom").withColumnRenamed("#Bathroom","Bathroom").withColumnRenamed("#-Car Garage","Car_Garage")
# checking for negative values and replacing with null value in one feature,if encountered
smooth = F.udf(lambda x: x if x > 0 else None, IntegerType())
df= df.withColumn('Lot_size', smooth(F.col('Lot_size')))
df.show(5)

+----------+----+-------+--------+-------+-------+--------+----------+--------+--------------------+--------------+
|    Suburb|Type|  Price|Distance|Zipcode|Bedroom|Bathroom|Car_Garage|Lot_size|         Region_name|Property_count|
+----------+----+-------+--------+-------+-------+--------+----------+--------+--------------------+--------------+
|Abbotsford|   h|1165000|     2.5|   3067|      3|       2|      null|      92|Northern Metropol...|          4019|
|Abbotsford|   h|1050000|     2.5|   3067|      2|       1|      null|     129|Northern Metropol...|          4019|
|Abbotsford|   h|1465000|     2.5|   3067|      3|       2|      null|     134|Northern Metropol...|          4019|
|Abbotsford|   h| 911000|     3.0|   3067|      2|       1|      null|     141|Northern Metropol...|          4019|
|Abbotsford|   h|1635000|     3.0|   3067|      3|       1|      null|     142|Northern Metropol...|          4019|
+----------+----+-------+--------+-------+-------+--------+----------+--

In [35]:
## importing imputer
from pyspark.ml.feature import Imputer
## filling null values using mean of neighbour values
imputer = Imputer(inputCols=["Car_Garage", "Bathroom"], outputCols=["out_garrage", "out_Bathroom"])

## printing the imputed result
imputer.fit(df).transform(df).select("Car_Garage", "Bathroom","out_garrage","out_Bathroom").show()

# schema of dataframe
df.printSchema()

df.count()

+----------+--------+-----------+------------+
|Car_Garage|Bathroom|out_garrage|out_Bathroom|
+----------+--------+-----------+------------+
|      null|       2|          1|           2|
|      null|       1|          1|           1|
|      null|       2|          1|           2|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|      null|       2|          1|           2|
|      null|       1|          1|           1|
|      null|       1|          1|           1|
|         1|       2|          1|           2|
|         1|       2|          1|           2|
|         1|       2|          1|           2|
|         1| 

11579

In [36]:
## converting string features to vectors by using string indexer
from pyspark.ml.feature import StringIndexer

suburb      = StringIndexer(inputCol="Suburb", outputCol="Suburb_indexed",handleInvalid='skip')
Type        = StringIndexer(inputCol="Type", outputCol="Type_indexed",handleInvalid='skip')
region_name = StringIndexer(inputCol="Region_name", outputCol="Region_name_indexed",handleInvalid='skip')

#importing pipeline
from pyspark.ml import Pipeline

## fitting stages to pipeline
pipeline_indexing = Pipeline(stages=[suburb, Type, region_name])

## transforming dataframe by using pipeline
df_=pipeline_indexing.fit(df).transform(df)

## importing pandas
import pandas

df_pd = df_.toPandas()

df_pd



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 200, in _batched
    for item in iterator:
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/Users/govardhan/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-34-a08d502fdf3f>", line 4, in <lambda>
TypeError: '>' not supported between instances of 'NoneType' and 'int'


In [None]:
#Finding correlation with target Label "Price"
Distance_corr=df_pd['Distance'].corr(df_pd['Price'])
Zipcode_corr=df_pd['Zipcode'].corr(df_pd['Price'])
Bedroom_corr=df_pd['Bedroom'].corr(df_pd['Price'])
Bathroom_corr=df_pd['Bathroom'].corr(df_pd['Price'])
Car_Garage_corr=df_pd['Car_Garage'].corr(df_pd['Price'])
Lot_size_corr=df_pd['Lot_size'].corr(df_pd['Price'])
Property_count_corr=df_pd['Property_count'].corr(df_pd['Price'])
Suburb_indexed_corr=df_pd['Suburb_indexed'].corr(df_pd['Price'])
Type_indexed_corr=df_pd['Type_indexed'].corr(df_pd['Price'])
Region_name_indexed_corr=df_pd['Region_name_indexed'].corr(df_pd['Price'])
Lot_size_corr=df_pd['Lot_size'].corr(df_pd['Price'])

In [None]:
print("Distance correlation is   :",Distance_corr)
print("Zipcode correlation is    :",Zipcode_corr)
print("Bedroom correlation is    :",Bedroom_corr)
print("Bathroom correlation is   :",Bathroom_corr)
print("Car_Garage correlation is :",Car_Garage_corr)
print("Lot_size correlation is   :",Lot_size_corr)
print("Property correlation is            :",Property_count_corr)
print("Suburb_indexed correlation is      :",Suburb_indexed_corr)
print("Type_indexed correlation is        :",Type_indexed_corr)
print("Region_name_indexed correlation is :",Region_name_indexed_corr)
print("Lot_size correlation is            :",Lot_size_corr)


In [None]:
#TASK_3(a)training with highest correlation features:

# importing Spark ML modules
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer

# Assebling features into a vector matrix
assembler = VectorAssembler(
    inputCols=["Bedroom", "out_Bathroom","Zipcode"],
    outputCol="features")
assembler.setParams(handleInvalid="skip")

# Normalizing the vector of features for model improvement
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=2)

# Splitting data inti test and train sets
(trainingData, testData) = df.randomSplit([0.8, 0.2],100)

In [None]:
print("Number of train records are",trainingData.count())
print("Number of test records are",testData.count())

In [None]:
# Training  a RandomForest model.
rf = RandomForestRegressor(featuresCol="normFeatures",labelCol='Price',)

## Task Pipeline
# fitting the above stages in pipeline
pipeline = Pipeline(stages=[imputer,pipeline_indexing,assembler,normalizer, rf])

# Training model. 
model = pipeline.fit(trainingData)

# Making predictions.
predictions = model.transform(testData)

## showing the predictions and actual values
predictions.select("prediction", "Price").show(7)

In [None]:
#TASK-4 RMSE Metric on predicting Label "Price" with highest correlation features 

evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(rmse)

In [None]:
#TASK-3 (b)Predicting label "Price" using all features
## Assembling all features into a vector matric as features

assembler1 = VectorAssembler(
    inputCols=[
 'Distance',
 'Zipcode',
 'Bedroom',
        "out_garrage","out_Bathroom",
 'Lot_size',
 'Property_count'],
    outputCol="features")
assembler1.setParams(handleInvalid="skip")

### Normalizing the vector of features for model improvement
normalizer1 = Normalizer(inputCol="features", outputCol="normFeatures", p=2)

## Spliting the data into test and train sets
(trainingData1, testData1) = df.randomSplit([0.8, 0.2],100)


# Training a RandomForest model.
rf1 = RandomForestRegressor(featuresCol="normFeatures",labelCol='Price')

# pipeline for the stages above
pipeline1 = Pipeline(stages=[imputer,pipeline_indexing,assembler1,normalizer1, rf1])

# Training the model
model1 = pipeline1.fit(trainingData1)

# Making predictions.
predictions1 = model1.transform(testData1)

predictions1.select("prediction", "Price","normFeatures").show(7)


In [None]:
#TASK-4 RMSE Metric on predicting Label "Price" with all features 
evaluator1 = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse1 = evaluator1.evaluate(predictions1)

print(rmse1)