**Big Data Hadoop & Spark Exam**

In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Q.1 Read the given CSV file in a Hive *table*

In [2]:
import pyspark #importing the pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
appName= "hive_pyspark"
master= "local"

# Creating a Spark Session

In [3]:
spark = SparkSession.builder \
	.master(master).appName(appName).enableHiveSupport().getOrCreate()
 #enabling the Hive support to interact with the hive database.

In [4]:
df=spark.read.csv("boston.csv",header=True,inferSchema=True)

#loading the dataset
#By setting inferSchema=true , Spark will automatically go through the csv file and infer the schema of each column. 

In [5]:
df.printSchema()
#verifying the data types of the table

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



# Q2. Read the data from Hive table as spark dataframe 

In [6]:
#read the CSV file from the local write to the table in hive using pyspark 
df.write.saveAsTable("Boston_table_hive")

In [7]:
df1=spark.sql("select * from Boston_table_hive")
df1.show()

+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|       CRIM|  ZN|      INDUS|CHAS|        NOX|         RM|        AGE|        DIS|RAD|TAX|         PT|          B|      LSTAT|         MV|
+-----------+----+-----------+----+-----------+-----------+-----------+-----------+---+---+-----------+-----------+-----------+-----------+
|    0.00632|18.0|2.309999943|   0|0.537999988|6.574999809|65.19999695|4.090000153|  1|296|15.30000019|396.8999939|4.980000019|       24.0|
|0.027310001| 0.0|7.070000172|   0|0.469000012|6.421000004|78.90000153|4.967100143|  2|242|17.79999924|396.8999939|9.140000343|21.60000038|
|    0.02729| 0.0|7.070000172|   0|0.469000012|7.184999943|61.09999847|4.967100143|  2|242|17.79999924|392.8299866| 4.03000021|34.70000076|
|0.032370001| 0.0|2.180000067|   0|0.458000004|6.998000145|45.79999924|6.062200069|  3|222|18.70000076|394.6300049|2.940000057|33.40000153|
|0.069049999| 0.0|2.

In [8]:
#Perform descriptive analytics
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
CRIM,506,3.6135235608162057,8.601545086715594,0.00632,88.97619629
ZN,506,11.363636363636363,23.32245299451514,0.0,100.0
INDUS,506,11.136778749531626,6.86035298095724,0.460000008,27.73999977
CHAS,506,0.0691699604743083,0.2539940413404101,0,1
NOX,506,0.5546950602312246,0.1158776754570543,0.38499999,0.870999992
RM,506,6.28463438896641,0.7026171549511354,3.561000109,8.779999733
AGE,506,68.57490120115612,28.148861532793276,2.900000095,100.0
DIS,506,3.7950426960059325,2.105710142043288,1.129600048,12.12650013
RAD,506,9.549407114624506,8.707259384239366,1,24


# Q3. Get the correlation between dependent and independent 
# variables


In [9]:
#here the dependent variable is MV
for i in df.columns:
        print( "Correlation to MV for ", i, df1.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [10]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
v_df = vectorAssembler.transform(df)
v_df = v_df.select(['features', 'MV'])
v_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



In [11]:
#splitting the data into train and test
splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

# Q4. Build a linear regression model to predict house price

In [12]:
#linear regression model for train dataset
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.03962506603163508,0.013404058051191442,-0.0470542275069404,1.8040533911402308,-4.266972263903575,3.8025142347186875,0.0,-0.5730879167712861,0.012623420516305927,0.0,-0.7291450122553467,0.010463995768243096,-0.5350153563099811]
Intercept: 19.825128739762242


# Q5. Evaluate the Linear Regression model by getting the RMSE 
# and R-squared values

In [13]:
#evaluating the linear regression model with RMSE and R-Squared
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
#Summarize the model over the training set

RMSE: 4.830413
r2: 0.712156


In [14]:
train_df.describe().show()
#R squared at 0.75 indicates that in our model, 
#approximate 74% of the variability in “MV” can be explained using the model.

+-------+-----------------+
|summary|               MV|
+-------+-----------------+
|  count|              357|
|   mean|22.23193282290196|
| stddev| 9.01601821611917|
|    min|              5.0|
|    max|             50.0|
+-------+-----------------+



In [15]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|30.415495713529786|35.40000153|[0.01311,90.0,1.2...|
| 37.86940019217083|       50.0|[0.01381,80.0,0.4...|
|31.547245148136568|31.60000038|[0.01432,100.0,1....|
|27.935065133270303|       24.5|[0.01501,80.0,2.0...|
|27.103144834720162|30.10000038|[0.01709,90.0,2.0...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.710168


In [16]:
#rmse value for the test dataset
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 5.1562


In [17]:
#rmse value for the train dataset
train_result = lr_model.evaluate(train_df)
print("Root Mean Squared Error (RMSE) on train data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on train data = 5.1562
