Given is the boston.csv dataset with the following variable information:
# CRIM - Per Capita crime rate
# ZN - Proportion of residential land zoned for lots over 25000 sq. ft
# INDUS - Proportion of non-retial business acres
# CHAS - Charles River dummy variable (1 - if tracts bounds river, 0 -otherwise)
# NOX - Nitrogen Oxide concentration
# RM - Average number of rooms per dwelling
# AGE - Proportion of owner-occupied unit built prior 1940
# DIS - Weighted MEan of distances of five Boston Employement Centres
# RAD - Index of accessibilities to Radial highways
# TAX - Full-value-property-tax rates per $10,000
# PT - Pupil-teacher Ratio
# B - the proportion of blacks
# LSTAT - Lower Status of the Population (%)
# MV - Median Value of homes (Target Variable)

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 51.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=8f0dbd323d36f3d31fed4ed966c6bca170fcd6df695233a72d2065f231d57324
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [None]:
spark=SparkSession.builder.appName('Questions').getOrCreate()

In [None]:
spark

# Q.1
# Read the given CSV file in a Hive table 

---



---





# Perform the following tasks using PySpark
# Q2. Read the data from Hive table as spark dataframe

In [None]:
emp = spark.read.csv('/content/boston.csv',header='true', 
                      inferSchema='true')
emp.show(5)

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001| 0.0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999| 0.0|2.

# Q3. Get the correlation between dependent and independent variables

In [None]:
import six
for i in emp.columns:
    if not( isinstance(emp.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, emp.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')


In [None]:
vhouse_df = vectorAssembler.transform(emp)


In [None]:
vhouse_df =vhouse_df.select(['features', 'MV'])
vhouse_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



# Q4. Build a linear regression model to predict house price

In [None]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.025037968160610824,0.005640799184985269,0.0,0.09324703578921087,-6.438928211248475,4.842621411775615,0.0,-0.45569750443074103,0.0,-0.003685921035017689,-0.8880934687930451,0.006725830658934064,-0.3791496487770949]
Intercept: 17.400901210855146


# Q5. Evaluate the Linear Regression model by getting the RMSE and R-squared values

In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.322945
r2: 0.744371


In [None]:
train_df.describe().show()

+-------+------------------+
|summary|                MV|
+-------+------------------+
|  count|               357|
|   mean|22.113725526002803|
| stddev| 8.562162452362136|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [None]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|32.371438696218554|32.70000076|[0.01301,35.0,1.5...|
|30.563858290603495|35.40000153|[0.01311,90.0,1.2...|
|16.952043881804325|18.89999962|[0.0136,75.0,4.0,...|
| 38.49326798501068|       50.0|[0.01381,80.0,0.4...|
| 25.82000071126466|23.10000038|[0.0187,85.0,4.15...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.619351


In [None]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 6.47373


In [None]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 10
objectiveHistory: [0.49999999999999956, 0.4305439922001351, 0.2193416498160873, 0.19708735265922883, 0.16899042095445863, 0.1670963052820008, 0.16679017103300825, 0.16640504462016953, 0.1657181172081439, 0.1656253685283487, 0.16557595882950427]
+--------------------+
|           residuals|
+--------------------+
|  -6.117020072235093|
|  0.8324670627464954|
|  -5.966457469099101|
| 0.27693008442520295|
| -1.2453117381711323|
| -3.1079528878423233|
|  10.240053301987729|
|   9.019943167766186|
|  2.6838907008947857|
|  2.1758964443994984|
|  -1.643821135120806|
|   5.968876832412207|
|-0.05995090426608485|
| -10.046860089714421|
|  -3.461108941869089|
|   3.374821972092562|
|  -4.115334868128937|
|   2.414719889130705|
| -1.1300696413305218|
|  0.4814324060976709|
+--------------------+
only showing top 20 rows



In [None]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|32.371438696218554|32.70000076|[0.01301,35.0,1.5...|
|30.563858290603495|35.40000153|[0.01311,90.0,1.2...|
|16.952043881804325|18.89999962|[0.0136,75.0,4.0,...|
| 38.49326798501068|       50.0|[0.01381,80.0,0.4...|
| 25.82000071126466|23.10000038|[0.0187,85.0,4.15...|
| 27.29703200907881|       33.0|[0.019509999,17.5...|
| 39.48460054276684|       50.0|[0.020090001,95.0...|
|25.927658651502227|24.70000076|[0.02055,85.0,0.7...|
|24.295945855507448|19.39999962|[0.03466,35.0,6.0...|
|29.550914956280593|       28.5|[0.035020001,80.0...|
|24.071853406548748|22.89999962|[0.03551,25.0,4.8...|
| 37.54158481455373|45.40000153|[0.035780001,20.0...|
|28.094211571974178|       23.5|[0.035840001,80.0...|
|25.067213314799016|24.79999924|[0.036589999,25.0...|
| 34.86968469363982|34.59999847|[0.03768,80.0,1.5...|
|25.961741959592253|23.20000

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'MV')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="MV", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 5.33332
