In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

In [18]:
!ls

sample_data  spark-2.4.4-bin-hadoop2.7.tgz


In [0]:
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [38]:
!wget https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv

--2019-10-02 18:13:21--  https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35735 (35K) [text/plain]
Saving to: ‘BostonHousing.csv’


2019-10-02 18:13:21 (4.63 MB/s) - ‘BostonHousing.csv’ saved [35735/35735]



In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)

In [40]:
type(dataset)

pyspark.sql.dataframe.DataFrame

In [41]:
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [42]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')
output = assembler.transform(dataset)
#Input vs Output
finalized_data = output.select("Attributes","medv")
finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



In [43]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])
regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')
#Learn to fit the model from training set
regressor = regressor.fit(train_data)
#To predict the prices on testing set
pred = regressor.evaluate(test_data)
#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.01965,80.0,1.7...|20.1|20.605607491567984|
|[0.02009,95.0,2.6...|50.0| 42.70344796104372|
|[0.02729,0.0,7.07...|34.7|29.811523300370837|
|[0.03537,34.0,6.0...|22.0|28.944963295120896|
|[0.03578,20.0,3.3...|45.4| 37.92211113088961|
|[0.03705,20.0,3.3...|35.4| 34.20047434311519|
|[0.03768,80.0,1.5...|34.6| 34.86126212496592|
|[0.04379,80.0,3.3...|19.4| 26.44892780633672|
|[0.04417,70.0,2.2...|24.8|30.874504347634346|
|[0.0456,0.0,13.89...|23.3|   26.692893565159|
|[0.04666,80.0,1.5...|30.3| 32.99781355320101|
|[0.04684,0.0,3.41...|22.6|27.303677594763244|
|[0.04819,80.0,3.6...|21.9|24.510726975965913|
|[0.05023,35.0,6.0...|17.1|20.709215332901532|
|[0.05188,0.0,4.49...|22.5| 22.48856744964019|
|[0.05425,0.0,4.05...|24.6|29.322450103510384|
|[0.05515,33.0,2.1...|36.1| 32.95810287960997|
|[0.05646,0.0,12.8...|21.2|20.873675882307005|
|[0.06151,0.0

In [44]:
#coefficient of the regression model
coeff = regressor.coefficients
#X and Y intercept
intr = regressor.intercept
print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-0.1031, 0.0502, -0.0509, 2.7418, -15.5109, 3.1506, -0.007, -1.5252, 0.2839, -0.011, -0.996, 0.0084, -0.5174])
The Intercept of the model is : 41.418059


In [45]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")
# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)
# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 4.640
MSE: 21.532
MAE: 3.158
r2: 0.760


In [0]:
spark.stop()

In [49]:
!wget http://www.gutenberg.org/files/46787/46787-0.txt

--2019-10-02 18:22:54--  http://www.gutenberg.org/files/46787/46787-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 371977 (363K) [text/plain]
Saving to: ‘46787-0.txt’


2019-10-02 18:22:55 (773 KB/s) - ‘46787-0.txt’ saved [371977/371977]



In [61]:
# Creating Spark Context
from pyspark import SparkContext
sc = SparkContext("local", "first app")
# Calculating words count
text_file = sc.textFile("46787-0.txt")

counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1))  \
             .reduceByKey(lambda a, b: a + b)
# Printing each word with its respective count
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
#Stopping Spark Context
sc.stop()

The: 278
Project: 154
Gutenberg: 40
EBook: 2
of: 2090
Grapes: 3
wrath,: 4
by: 247
Boyd: 8
Cable: 7
: 2001
This: 22
eBook: 8
is: 176
for: 363
the: 4206
use: 20
anyone: 7
anywhere: 3
in: 889
United: 27
States: 13
and: 2688
most: 51
other: 136
parts: 6
world: 4
at: 315
no: 125
cost: 5
with: 508
almost: 44
restrictions: 3
whatsoever.: 3
You: 30
may: 47
copy: 15
it,: 72
give: 28
it: 420
away: 28
or: 414
re-use: 3
under: 52
terms: 38
License: 15
included: 3
this: 205
online: 7
www.gutenberg.org.: 2
If: 47
you: 277
are: 89
not: 161
located: 13
States,: 6
you'll: 2
have: 132
to: 1634
check: 10
laws: 16
country: 9
where: 89
before: 54
using: 11
ebook.: 2
Title:: 1
wrath: 7
Author:: 1
Release: 1
Date:: 1
September: 1
6,: 1
2014: 1
[EBook: 1
#46787]: 1
Language:: 1
English: 3
Character: 1
set: 42
encoding:: 1
UTF-8: 1
***: 6
START: 1
OF: 30
THIS: 8
PROJECT: 6
GUTENBERG: 4
EBOOK: 2
GRAPES: 5
WRATH: 5
Produced: 2
Shaun: 2
Pinder,: 2
Charlie: 2
Howard,: 2
Online: 2
Distributed: 2
Proofreading: 2
Tea

In [71]:
# Creating Spark Context
import random
from pyspark import SparkContext
sc = SparkContext("local", "second app")

NUM_SAMPLES=10000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
sc.stop()

Pi is roughly 3.158000
