Getting started with Pyspark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
emp_df = spark.read.csv("dataset/employee.txt",header=True)

# Basics and Data preprocessing

In [0]:
emp_df.columns

In [0]:
emp_df.take(5)

In [0]:
emp_df.count()

In [0]:
sample_df = emp_df.sample(False, 0.1)
sample_df.take(5)

In [0]:
emp_managers_df = emp_df.filter("salary>=100000")

In [0]:
emp_managers_df.select("salary").show()

## Normalize data

In [0]:
import numpy as np
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [0]:
features_df = spark.createDataFrame([
  (1,Vectors.dense([10.0, 10000.0, 1.0]),),
  (2,Vectors.dense([20.0, 20000.0, 2.0]),),
  (3,Vectors.dense([30.0, 30000.0, 3.0]),)],
  ["id","features"]
)

In [0]:
features_df.take(1)

In [0]:
feature_scaler = MinMaxScaler(inputCol="features",outputCol="sFeatures")
smodel = feature_scaler.fit(features_df)
sFeatures_df = smodel.transform(features_df)

In [0]:
sFeatures_df.select("Features","sFeatures").show()

## Standardize numeric data- Mapping data to normal distribution

In [0]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [0]:
features_df = spark.createDataFrame([
  (1,Vectors.dense([10.0, 10000.0, 1.0]),),
  (2,Vectors.dense([20.0, 30000.0, 2.0]),),
  (3,Vectors.dense([30.0, 40000.0, 3.0]),)],
  ["id","features"]
)
features_df.take(1)

In [0]:
feature_standard_scalar = StandardScaler(inputCol="features", outputCol="sFeatures",withStd=True,withMean=True)
scModel = feature_standard_scalar.fit(features_df)
scFeatures_df = scModel.transform(features_df)
scFeatures_df.select("features","sFeatures").show()

## Bucketize numeric data

In [0]:
from pyspark.ml.feature import Bucketizer
splits = [-float("Inf"),-10.0,0,10, float("Inf")]

In [0]:
b_data = [(-800,),(-19,),(10,),(4,),(6,),(11,),(800,)]
b_df = spark.createDataFrame(b_data,["features"])
b_df.show()

In [0]:
bucketizer = Bucketizer(splits=splits,inputCol="features",outputCol="bFeatures")
bucketed_df = bucketizer.transform(b_df)

In [0]:
bucketed_df.show()

## Working with Text Data - Tokenization

In [0]:
from pyspark.ml.feature import Tokenizer

In [0]:
sentences_df = spark.createDataFrame([
  (1,"This is an introduction to Spark MLlib"),
  (2,"MLlib includes libraries for classification and regression"),
  (3,"It also contains supporting tools and pipelines")],
  ["id","sentence"])

In [0]:
sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|This is an introd...|
|  2|MLlib includes li...|
|  3|It also contains ...|
+---+--------------------+



In [0]:
sent_token = Tokenizer(inputCol="sentence",outputCol="words")
sent_tokenized_df = sent_token.transform(sentences_df)
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|MLlib includes li...|[mllib, includes,...|
|  3|It also contains ...|[it, also, contai...|
+---+--------------------+--------------------+



## TD-IDF

In [0]:
from pyspark.ml.feature import HashingTF, IDF

In [0]:
hashingTF = HashingTF(inputCol="words",outputCol="rawFeatures",numFeatures=20)
sent_hfTF_df = hashingTF.transform(sent_tokenized_df)
sent_hfTF_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {1: 2.0, 5: 1.0, 6: 1.0, 8: 1.0, 12: 1.0, 13: 1.0}))]

In [0]:
idf = IDF(inputCol="rawFeatures",outputCol="idf_features")
idfModel = idf.fit(sent_hfTF_df)
tfidf_df =idfModel.transform(sent_hfTF_df)

In [0]:
tfidf_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {1: 2.0, 5: 1.0, 6: 1.0, 8: 1.0, 12: 1.0, 13: 1.0}), idf_features=SparseVector(20, {1: 0.5754, 5: 0.6931, 6: 0.2877, 8: 0.2877, 12: 0.0, 13: 0.0}))]

# Clustering using MLlib


## KMeans clustering

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [0]:
cluster_df = spark.read.csv("dataset/clustering_dataset.csv",header=True,inferSchema=True)

In [0]:
cluster_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
+----+----+----+
only showing top 20 rows



In [0]:
vectorAssembler = VectorAssembler(inputCols=["col1","col2","col3"],outputCol="features")
vcluster_df = vectorAssembler.transform(cluster_df)

In [0]:
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [0]:
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(vcluster_df)

In [0]:
centers =kmodel.clusterCenters()
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([5.12, 5.84, 4.84]),
 array([80.        , 79.20833333, 78.29166667])]

## Hierarchial Clustering

In [0]:
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [0]:
from pyspark.ml.clustering import BisectingKMeans
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)

In [0]:
bkmodel = bkmeans.fit(vcluster_df)
bkCenters = bkmodel.clusterCenters()

In [0]:
bkCenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

In [0]:
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([5.12, 5.84, 4.84]),
 array([80.        , 79.20833333, 78.29166667])]

# Classification using MLlib

In [0]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [0]:
iris_df = spark.read.csv("dataset/iris.txt", inferSchema=True)

In [25]:
iris_df.take(2)

[Row(_c0=5.1, _c1=3.5, _c2=1.4, _c3=0.2, _c4='Iris-setosa'),
 Row(_c0=4.9, _c1=3.0, _c2=1.4, _c3=0.2, _c4='Iris-setosa')]

In [0]:
iris_df = iris_df.select(col("_c0").alias("sepal_length"),
                         col("_c1").alias("sepal_width"),
                         col("_c2").alias("petal_length"),
                         col("_c3").alias("petal_width"),
                         col("_c4").alias("species"))

In [27]:
iris_df.take(2)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa'),
 Row(sepal_length=4.9, sepal_width=3.0, petal_length=1.4, petal_width=0.2, species='Iris-setosa')]

In [28]:
vectorAssembler = VectorAssembler(inputCols=["sepal_length","sepal_width","petal_length","petal_width"],outputCol="features")
viris_df = vectorAssembler.transform(iris_df)
viris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [0]:
indexer = StringIndexer(inputCol="species",outputCol="label")
iv_iris_df = indexer.fit(viris_df).transform(viris_df)

In [35]:
iv_iris_df.take(3)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0),
 Row(sepal_length=4.9, sepal_width=3.0, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([4.9, 3.0, 1.4, 0.2]), label=0.0),
 Row(sepal_length=4.7, sepal_width=3.2, petal_length=1.3, petal_width=0.2, species='Iris-setosa', features=DenseVector([4.7, 3.2, 1.3, 0.2]), label=0.0)]

## Naive Bayes Classifier

In [0]:
from pyspark.ml.classification import  NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
splits = iv_iris_df.randomSplit([0.6,0.4],1)
train_df = splits[0]
test_df = splits[1]

In [0]:
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)

In [41]:
predictions_df = nbmodel.transform(test_df)
predictions_df.take(1)

[Row(sepal_length=4.5, sepal_width=2.3, petal_length=1.3, petal_width=0.3, species='Iris-setosa', features=DenseVector([4.5, 2.3, 1.3, 0.3]), label=0.0, rawPrediction=DenseVector([-10.3605, -11.0141, -11.7112]), probability=DenseVector([0.562, 0.2924, 0.1456]), prediction=0.0)]

In [44]:
evaluator = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")
nbAccuracy = evaluator.evaluate(predictions_df)
nbAccuracy

0.5862068965517241

## Multilayer Perceptron

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [0]:
#Creating perceptron with 4 layers
layers = [4,5,5,3]

In [0]:
mlp = MultilayerPerceptronClassifier(layers=layers, seed=1)
mlp_model = mlp.fit(train_df)

In [0]:
mlp_predictions = mlp_model.transform(test_df)

In [51]:
mlpEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
mlp_accuracy = mlpEvaluator.evaluate(mlp_predictions)
mlp_accuracy

0.9482758620689655

## Decision Tree Classifier

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

In [0]:
dt = DecisionTreeClassifier(labelCol="label",featuresCol="features")
dt_model = dt.fit(train_df)

In [0]:
dt_predictions = dt_model.transform(test_df)

In [59]:
dtEvaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = mlpEvaluator.evaluate(dt_predictions)
dt_accuracy

0.9310344827586207

# Regression

## Linear Regression

In [0]:
from pyspark.ml.regression import LinearRegression

In [0]:
pp_df = spark.read.csv("dataset/powerplant.csv",header=True,inferSchema=True)


In [12]:
pp_df.take(1)

[Row(AT=8.34, V=40.77, AP=1010.84, RH=90.01, PE=480.48)]

In [0]:
from pyspark.ml.feature import VectorAssembler

In [15]:
vectorAssembler = VectorAssembler(inputCols=["AT","V","AP","RH"],outputCol="features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.take(1)

[Row(AT=8.34, V=40.77, AP=1010.84, RH=90.01, PE=480.48, features=DenseVector([8.34, 40.77, 1010.84, 90.01]))]

In [0]:
lr = LinearRegression(featuresCol="features",labelCol="PE")
lr_model = lr.fit(vpp_df)

In [17]:
lr_model.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [18]:
lr_model.intercept

454.6092741526312

In [19]:
lr_model.summary.rootMeanSquaredError

4.557126016749477

In [0]:
lr_model.save("lr_model")

## Decision Tree Regression

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
pp_df = spark.read.csv("dataset/powerplant.csv",header=True,inferSchema=True)

In [23]:
pp_df.take(1)

[Row(AT=8.34, V=40.77, AP=1010.84, RH=90.01, PE=480.48)]

In [24]:
vectorAssembler = VectorAssembler(inputCols=["AT","V","AP","RH"],outputCol="features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.take(1)

[Row(AT=8.34, V=40.77, AP=1010.84, RH=90.01, PE=480.48, features=DenseVector([8.34, 40.77, 1010.84, 90.01]))]

In [0]:
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
dr = DecisionTreeRegressor(featuresCol="features",labelCol="PE")
dr_model = dr.fit(train_df)
dr_predictions = dr_model.transform(test_df)

In [0]:
evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="PE",metricName='rmse')
rmse = evaluator.evaluate(dr_predictions)

In [32]:
rmse

4.5729265940611405

## Gradient Boosting Tree Regression

In [0]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features",labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_rmse = evaluator.evaluate(gbt_predictions)

In [35]:
gbt_rmse

4.188017375539632