<a href="https://colab.research.google.com/github/woosikyang/Pyspark/blob/master/Pyspark_tutorial_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
#!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from google.colab import files
#files.upload()

In [0]:
import os
os.listdir(os.path.join(os.getcwd(),'sample_data'))

data_path = os.path.join(os.getcwd(),'sample_data')
tr_data = os.path.join(data_path, 'california_housing_train.csv')
te_data =  os.path.join(data_path, 'california_housing_test.csv')

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

#dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)

In [0]:
#dataset.printSchema()
tr_data = spark.read.csv(tr_data, inferSchema=True, header=True)
te_data = spark.read.csv(te_data, inferSchema=True, header=True)

In [31]:
tr_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [35]:
cols = tr_data.columns[:-1]
print(cols)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']


In [36]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=cols, outputCol = 'Attributes')

tr_output = assembler.transform(tr_data)
te_output = assembler.transform(te_data)

#Input vs Output
tr_finalized_data = tr_output.select("Attributes",tr_data.columns[-1])
te_finalized_data = te_output.select("Attributes",tr_data.columns[-1])

tr_finalized_data.show()

+--------------------+------------------+
|          Attributes|median_house_value|
+--------------------+------------------+
|[-114.31,34.19,15...|           66900.0|
|[-114.47,34.4,19....|           80100.0|
|[-114.56,33.69,17...|           85700.0|
|[-114.57,33.64,14...|           73400.0|
|[-114.57,33.57,20...|           65500.0|
|[-114.58,33.63,29...|           74000.0|
|[-114.58,33.61,25...|           82400.0|
|[-114.59,34.83,41...|           48500.0|
|[-114.59,33.61,34...|           58400.0|
|[-114.6,34.83,46....|           48100.0|
|[-114.6,33.62,16....|           86500.0|
|[-114.6,33.6,21.0...|           62000.0|
|[-114.61,34.84,48...|           48600.0|
|[-114.61,34.83,31...|           70400.0|
|[-114.63,32.76,15...|           45000.0|
|[-114.65,34.89,17...|           69100.0|
|[-114.65,33.6,28....|           94900.0|
|[-114.65,32.79,21...|           25000.0|
|[-114.66,32.74,17...|           44000.0|
|[-114.67,33.92,17...|           27500.0|
+--------------------+------------

In [38]:
te_finalized_data.show(5)

+--------------------+------------------+
|          Attributes|median_house_value|
+--------------------+------------------+
|[-122.05,37.37,27...|          344700.0|
|[-118.3,34.26,43....|          176500.0|
|[-117.81,33.78,27...|          270500.0|
|[-118.36,33.82,28...|          330000.0|
|[-119.67,36.33,19...|           81700.0|
+--------------------+------------------+
only showing top 5 rows



In [39]:
#Split training and testing data
#train_data,test_data = finalized_data.randomSplit([0.8,0.2])


regressor = LinearRegression(featuresCol = 'Attributes', labelCol = tr_data.columns[-1])

#Learn to fit the model from training set
regressor = regressor.fit(tr_finalized_data)

#To predict the prices on testing set
pred = regressor.evaluate(te_finalized_data)

#Predict the model
pred.predictions.show(5)

+--------------------+------------------+------------------+
|          Attributes|median_house_value|        prediction|
+--------------------+------------------+------------------+
|[-122.05,37.37,27...|          344700.0| 352812.3111097822|
|[-118.3,34.26,43....|          176500.0|  212717.700748377|
|[-117.81,33.78,27...|          270500.0| 272344.6995217828|
|[-118.36,33.82,28...|          330000.0| 314244.3193442584|
|[-119.67,36.33,19...|           81700.0|119644.22239148291|
+--------------------+------------------+------------------+
only showing top 5 rows



In [40]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-43139.6372, -42925.6731, 1150.6949, -8.3783, 117.6485, -38.4888, 45.436, 40507.0684])
The Intercept of the model is : -3620600.890969


In [41]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol=tr_data.columns[-1], predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 69765.360
MSE: 4867205486.918
MAE: 50352.228
r2: 0.620
