# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
!tar xf spark-2.3.3-bin-hadoop2.7.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7"

Run a local spark session to test your installation:

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
sqlSession = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

In [4]:
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
  print (num)

1
4
9
16


Upload "iris.csv"

In [5]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving iris.csv to iris.csv
User uploaded file "iris.csv" with length 3867 bytes


# **Import libraries** 

In [0]:
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# **Load and prepare data**



In [7]:
irisData = sc.textFile("iris.csv")
irisData.cache()
irisData.count()

151

Remove header

In [8]:
dataLines = irisData.filter(lambda x: "Sepal" not in x)
dataLines.count()

150

# **Cleanup Data**

Default average HP

In [16]:
parts = dataLines.map(lambda l: l.split(","))
irisMap = parts.map(lambda p: Row(SEPAL_LENGTH=float(p[0]),\
                                SEPAL_WIDTH=float(p[1]), \
                                PETAL_LENGTH=float(p[2]), \
                                PETAL_WIDTH=float(p[3]), \
                                SPECIES=p[4]))
irisMap.take(5)

[Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.1, SEPAL_WIDTH=3.5, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.9, SEPAL_WIDTH=3.0, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.3, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.7, SEPAL_WIDTH=3.2, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.5, PETAL_WIDTH=0.2, SEPAL_LENGTH=4.6, SEPAL_WIDTH=3.1, SPECIES='setosa'),
 Row(PETAL_LENGTH=1.4, PETAL_WIDTH=0.2, SEPAL_LENGTH=5.0, SEPAL_WIDTH=3.6, SPECIES='setosa')]

Clean data function

In [18]:
irisDf = sqlSession.createDataFrame(irisMap)
irisDf.cache()

DataFrame[PETAL_LENGTH: double, PETAL_WIDTH: double, SEPAL_LENGTH: double, SEPAL_WIDTH: double, SPECIES: string]

In [20]:
stringIndexer = StringIndexer(inputCol="SPECIES", outputCol="IND_SPECIES")
si_model = stringIndexer.fit(irisDf)
irisNormDf = si_model.transform(irisDf)

irisNormDf.select("SPECIES","IND_SPECIES").distinct().show()
irisNormDf.cache()

+----------+-----------+
|   SPECIES|IND_SPECIES|
+----------+-----------+
|versicolor|        0.0|
|    setosa|        2.0|
| virginica|        1.0|
+----------+-----------+



DataFrame[PETAL_LENGTH: double, PETAL_WIDTH: double, SEPAL_LENGTH: double, SEPAL_WIDTH: double, SPECIES: string, IND_SPECIES: double]

# **Data Analytics**


In [21]:
irisNormDf.describe().show()

+-------+------------------+------------------+------------------+------------------+---------+------------------+
|summary|      PETAL_LENGTH|       PETAL_WIDTH|      SEPAL_LENGTH|       SEPAL_WIDTH|  SPECIES|       IND_SPECIES|
+-------+------------------+------------------+------------------+------------------+---------+------------------+
|  count|               150|               150|               150|               150|      150|               150|
|   mean| 3.758000000000001|1.1993333333333331| 5.843333333333332|3.0573333333333337|     null|               1.0|
| stddev|1.7652982332594662|0.7622376689603467|0.8280661279778634|0.4358662849366978|     null|0.8192319205190404|
|    min|               1.0|               0.1|               4.3|               2.0|   setosa|               0.0|
|    max|               6.9|               2.5|               7.9|               4.4|virginica|               2.0|
+-------+------------------+------------------+------------------+--------------

In [25]:
for i in irisNormDf.columns:
  if not isinstance(irisNormDf.select(i).take(1)[0][0], str):
    print( "Correlation to Species for ", i, irisNormDf.stat.corr('IND_SPECIES',i))

Correlation to Species for  PETAL_LENGTH -0.649241830764174
Correlation to Species for  PETAL_WIDTH -0.5803770334306263
Correlation to Species for  SEPAL_LENGTH -0.46003915650023686
Correlation to Species for  SEPAL_WIDTH 0.6183715308237433
Correlation to Species for  IND_SPECIES 1.0


# **Prepare data**

Transform to a Data Frame for input to Machine Learing

Drop columns that are not required (low correlation)

In [0]:
def transformToLabeledPoint(row) :
    lp = ( row["SPECIES"], row["IND_SPECIES"], \
                Vectors.dense([row["SEPAL_LENGTH"],\
                        row["SEPAL_WIDTH"], \
                        row["PETAL_LENGTH"], \
                        row["PETAL_WIDTH"]]))
    return lp

In [29]:
irisLp = irisNormDf.rdd.map(transformToLabeledPoint)
irisLpDf = sqlSession.createDataFrame(irisLp,["species","label", "features"])
irisLpDf.select("species","label","features").show(10)
irisLpDf.cache()

+-------+-----+-----------------+
|species|label|         features|
+-------+-----+-----------------+
| setosa|  2.0|[5.1,3.5,1.4,0.2]|
| setosa|  2.0|[4.9,3.0,1.4,0.2]|
| setosa|  2.0|[4.7,3.2,1.3,0.2]|
| setosa|  2.0|[4.6,3.1,1.5,0.2]|
| setosa|  2.0|[5.0,3.6,1.4,0.2]|
| setosa|  2.0|[5.4,3.9,1.7,0.4]|
| setosa|  2.0|[4.6,3.4,1.4,0.3]|
| setosa|  2.0|[5.0,3.4,1.5,0.2]|
| setosa|  2.0|[4.4,2.9,1.4,0.2]|
| setosa|  2.0|[4.9,3.1,1.5,0.1]|
+-------+-----+-----------------+
only showing top 10 rows



DataFrame[species: string, label: double, features: vector]

# **Decision Tree**

Split into training and testing data

In [31]:
(trainingData, testData) = irisLpDf.randomSplit([0.9, 0.1])
print(trainingData.count())
print(testData.count())
testData.show()

139
11
+----------+-----+-----------------+
|   species|label|         features|
+----------+-----+-----------------+
|    setosa|  2.0|[5.1,3.8,1.9,0.4]|
|versicolor|  0.0|[5.1,2.5,3.0,1.1]|
|versicolor|  0.0|[5.8,2.6,4.0,1.2]|
|versicolor|  0.0|[6.1,3.0,4.6,1.4]|
| virginica|  1.0|[5.9,3.0,5.1,1.8]|
| virginica|  1.0|[6.1,3.0,4.9,1.8]|
| virginica|  1.0|[6.4,2.8,5.6,2.1]|
| virginica|  1.0|[6.4,2.8,5.6,2.2]|
| virginica|  1.0|[6.7,2.5,5.8,1.8]|
| virginica|  1.0|[6.8,3.2,5.9,2.3]|
| virginica|  1.0|[7.2,3.6,6.1,2.5]|
+----------+-----+-----------------+



Build the model on training data

In [33]:
dtClassifer = DecisionTreeClassifier(maxDepth=2, labelCol="label",\
                featuresCol="features")
dtModel = dtClassifer.fit(trainingData)

print(dtModel.numNodes)
print(dtModel.depth)

5
2


Print the metrics

In [34]:
predictions = dtModel.transform(testData)
predictions.select("prediction","species","label").show()

+----------+----------+-----+
|prediction|   species|label|
+----------+----------+-----+
|       2.0|    setosa|  2.0|
|       0.0|versicolor|  0.0|
|       0.0|versicolor|  0.0|
|       0.0|versicolor|  0.0|
|       1.0| virginica|  1.0|
|       1.0| virginica|  1.0|
|       1.0| virginica|  1.0|
|       1.0| virginica|  1.0|
|       1.0| virginica|  1.0|
|       1.0| virginica|  1.0|
|       1.0| virginica|  1.0|
+----------+----------+-----+



Predict on the test data

In [0]:
predictions = lrModel.transform(testData)
predictions.select("prediction","label","features").show()

+------------------+-----+-------------------+
|        prediction|label|           features|
+------------------+-----+-------------------+
|11.689008866297929| 12.0|[12.5,350.0,4499.0]|
|20.622054184255322| 13.0|[12.0,302.0,3169.0]|
|10.697397799606009| 13.0|[13.0,360.0,4654.0]|
|14.944383082391145| 13.0|[14.0,307.0,4098.0]|
|16.641286011853204| 15.0|[12.5,318.0,3777.0]|
|14.492106345996362| 15.0|[13.0,350.0,4082.0]|
| 12.36515269311474| 15.0|[14.0,350.0,4440.0]|
| 18.89257637397317| 16.0|[12.0,304.0,3433.0]|
|14.059724642397992| 16.0|[13.0,318.0,4190.0]|
|14.006521907573536| 17.5|[13.0,305.0,4215.0]|
|21.121434179238324| 18.0|[15.5,232.0,3288.0]|
|20.353254687046945| 22.0|[14.5,250.0,3353.0]|
| 27.93478848046246| 22.0|[16.0,122.0,2395.0]|
|27.146140743633442| 24.0|[15.0,120.0,2489.0]|
|27.025802538367373| 25.0|[17.0,140.0,2542.0]|
|27.980314976664065| 26.0|[15.5,108.0,2391.0]|
| 32.00926191163107| 26.0| [21.0,97.0,1950.0]|
|31.793126344515393| 29.0| [22.2,85.0,2035.0]|
| 32.98435630

Evaluate accuracy

In [35]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                    labelCol="label",metricName="accuracy")
evaluator.evaluate(predictions)

1.0

Confusion matrix

In [36]:
predictions.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|    7|
|  2.0|       2.0|    1|
|  0.0|       0.0|    3|
+-----+----------+-----+

