# Install Java and Spark on Hadoop

In [None]:
# install java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# install spark (change the version number if needed)
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
# unzip the spark file to the current folder
!tar xf spark-3.3.2-bin-hadoop3.tgz

Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
Get:3 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Get:7 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:10 http://security.ubuntu.com/ubuntu focal-security/universe amd64 Packages [1,046 kB]
Get:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease [24.3 kB]
Get:12 http://security.ubuntu.com/ubuntu focal-security/main amd64 Packages [2,681 kB]
Hit:13 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRe

In [None]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

In [None]:
!pip install findspark
import findspark
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# Create a SparkSession in Python

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("Introduction to Spark")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()

In [None]:
# Import necessary libraries
from pyspark.sql.functions import col, column, expr
from pyspark.sql import functions as f

# A. DataFrame exercise


##0. Load the data files

In [None]:
!git clone https://github.com/nnthaofit/CSC14118.git

Cloning into 'CSC14118'...
remote: Enumerating objects: 3, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects: 100% (2/2), done.[K
remote: Total 3 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (3/3), 762.51 KiB | 2.46 MiB/s, done.


In [None]:
df = spark.read.json("CSC14118/movies.json") 

In [None]:
df.show(10, truncate=False)

+-------------+--------------------+-------------------------------------------+----+
|cast         |genres              |title                                      |year|
+-------------+--------------------+-------------------------------------------+----+
|[]           |[]                  |After Dark in Central Park                 |1900|
|[]           |[]                  |Boarding School Girls' Pajama Parade       |1900|
|[]           |[]                  |Buffalo Bill's Wild West Parad             |1900|
|[]           |[]                  |Caught                                     |1900|
|[]           |[]                  |Clowns Spinning Hats                       |1900|
|[]           |[Short, Documentary]|Capture of Boer Battery by British         |1900|
|[]           |[]                  |The Enchanted Drawing                      |1900|
|[Paul Boyton]|[]                  |Feeding Sea Lions                          |1900|
|[]           |[Comedy]            |How to Make a Fat 

## 1a. Show the schema of DataFrame that stores the movies dataset.

In [None]:
df.schema

StructType([StructField('cast', ArrayType(StringType(), True), True), StructField('genres', ArrayType(StringType(), True), True), StructField('title', StringType(), True), StructField('year', LongType(), True)])

In [None]:
df.printSchema()

root
 |-- cast: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



## 1b. Show the number of distinct films in the dataset

In [None]:
df.select(f.countDistinct('*')).show(truncate=True)

+-----------------------------------------+
|count(DISTINCT cast, genres, title, year)|
+-----------------------------------------+
|                                    28789|
+-----------------------------------------+



In [None]:
df.distinct().count()

28789

## 2. Count the number of movies released during the years 2012 and 2015 (included)

In [None]:
df.filter((f.col('year')>=2012) & (f.col('year') <= 2015)).count()

1015

## 3. Show the year in which the number of movies released is highest. One highest year is enough

In [None]:
df.groupBy('year').count().orderBy("count", ascending=False).show(1, truncate=True)

+----+-----+
|year|count|
+----+-----+
|1919|  634|
+----+-----+
only showing top 1 row



## 4. Show the list of movies such that for each film, the number of actors/actresses is at least five, and the number of genres it belongs to is at most two genres.

In [None]:
df.filter((f.size(f.col('cast')) >= 5) & (f.size(f.col('genres')) <= 2)).select(f.col('title')).show(truncate=False)

+--------------------------------+
|title                           |
+--------------------------------+
|A Desperate Chance              |
|The Archeologist                |
|At the Potter's Wheel           |
|Back to the Farm                |
|The Beggar Child                |
|Billy's Rival                   |
|Break, Break, Break             |
|The Butterfly                   |
|Calamity Anne's Love Affair     |
|The Star Boarder                |
|A Story of Little Italy         |
|The Story of the Olive          |
|This Is th' Life                |
|The Ace of Hearts               |
|The Purple Highway              |
|The Thief of Bagdad             |
|Chang: A Drama of the Wilderness|
|Sorrell and Son                 |
|The Wreck of the Hesperus       |
|Anthony Adverse                 |
+--------------------------------+
only showing top 20 rows



## 5. Show the **movies** whose names are longest

In [None]:
df.withColumn("length", f.length('title')).orderBy('length', ascending=False).select('title').show(1, truncate=False)


+--------------------------------------------------------------------------------------------------------------+
|title                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
|Cornell-Columbia-University of Pennsylvania Boat Race at Ithaca, N.Y., Showing Lehigh Valley Observation Train|
+--------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [None]:
max = df.withColumn("len",f.length(df.title)).select(f.max('len')).first()[0]
df.filter(f.length(df.title) == max).show(truncate=False)

+----+------+--------------------------------------------------------------------------------------------------------------+----+
|cast|genres|title                                                                                                         |year|
+----+------+--------------------------------------------------------------------------------------------------------------+----+
|[]  |[]    |Cornell-Columbia-University of Pennsylvania Boat Race at Ithaca, N.Y., Showing Lehigh Valley Observation Train|1901|
+----+------+--------------------------------------------------------------------------------------------------------------+----+



## 6. Show the movies whose name contains the word “fighting” (case-insensitive).

In [None]:
df.filter(f.lower(f.col('title')).contains('fighting')).show(truncate=False)

+------------------------------------------+---------------+-----------------------+----+
|cast                                      |genres         |title                  |year|
+------------------------------------------+---------------+-----------------------+----+
|[Bessie Love, Anne Schaefer]              |[Comedy, Drama]|A Fighting Colleen     |1919|
|[Blanche Sweet, Russell Simpson]          |[Western]      |Fighting Cressy        |1919|
|[Harry T. Morey, Betty Blythe]            |[Drama]        |Fighting Destiny       |1919|
|[Tom Mix, Teddy Sampson]                  |[Western]      |Fighting for Gold      |1919|
|[Jack Perrin, Hoot Gibson, Josephine Hill]|[Western]      |The Fighting Heart     |1919|
|[Art Acord, Mildred Moore]                |[Western]      |The Fighting Line      |1919|
|[William Duncan, Edith Johnson]           |[Action]       |The Fighting Guide     |1922|
|[Tom Mix, Patsy Ruth Miller]              |[Western]      |The Fighting Streak    |1922|
|[Richard 

## 7. Show the list of distinct genres appearing in the dataset

1.   List item
2.   List item



In [None]:
# df.withColumn('genre', f.explode(df.genres)).groupBy('genre').count().rdd.map(lambda x: x.genre).collect()
df.withColumn('genre', f.explode(df.genres)).groupBy('genre').count().distinct().select('genre').show()

df.select(f.explode(df.genres).alias("Name")).distinct().show()

+-------------+
|        genre|
+-------------+
|        Crime|
|      Romance|
|     Thriller|
|      Slasher|
|Found Footage|
|    Adventure|
|         Teen|
| Martial Arts|
|       Sports|
|        Drama|
|          War|
|  Documentary|
|       Family|
|      Fantasy|
|       Silent|
|     Disaster|
|        Legal|
|      Mystery|
| Supernatural|
|     Suspense|
+-------------+
only showing top 20 rows

+-------------+
|         Name|
+-------------+
|        Crime|
|      Romance|
|     Thriller|
|      Slasher|
|Found Footage|
|    Adventure|
|         Teen|
| Martial Arts|
|       Sports|
|        Drama|
|          War|
|  Documentary|
|       Family|
|      Fantasy|
|       Silent|
|     Disaster|
|        Legal|
|      Mystery|
| Supernatural|
|     Suspense|
+-------------+
only showing top 20 rows



## 8. List all movies in which the actor Harrison Ford has participated.

In [None]:
df.filter(f.array_contains(df.cast, "Harrison Ford")).show(truncate=False)

+-------------------------------------------------+-----------------+-------------------------+----+
|cast                                             |genres           |title                    |year|
+-------------------------------------------------+-----------------+-------------------------+----+
|[Constance Talmadge, Harrison Ford]              |[Romance, Comedy]|Experimental Marriage    |1919|
|[Constance Talmadge, Harrison Ford]              |[Comedy]         |Happiness a la Mode      |1919|
|[Constance Talmadge, Harrison Ford]              |[Comedy]         |Romance and Arabella     |1919|
|[Vivian Martin, Harrison Ford]                   |[Comedy]         |The Third Kiss           |1919|
|[Harrison Ford, Constance Talmadge]              |[Comedy]         |The Veiled Adventure     |1919|
|[Constance Talmadge, Harrison Ford]              |[Comedy]         |Who Cares?               |1919|
|[Vivian Martin, Harrison Ford]                   |[Drama]          |You Never Saw Such a G

## 9. List all movies in which the actors/actresses whose names include the word “Lewis“ (case-insensitive) have participated.

In [None]:
df.withColumn('actor', f.explode(df.cast)).filter(f.lower(f.col('actor')).contains('lewis')).select(df.title).show(truncate=False)

+---------------------------+
|title                      |
+---------------------------+
|The Butterfly              |
|The Exploits of Elaine     |
|Mein Lieber Katrina        |
|Going Straight             |
|Gretchen the Greenhorn     |
|A Sister of Six            |
|The Bride's Silence        |
|Nine-Tenths of the Law     |
|The Faith of the Strong    |
|The Hoodlum                |
|Jacques of the Silver North|
|The Last of His People     |
|Man's Desire               |
|Yvonne from Paris          |
|Nine-Tenths of the Law     |
|813                        |
|Huckleberry Finn           |
|Salvage                    |
|The Five Dollar Baby       |
|A Fool There Was           |
+---------------------------+
only showing top 20 rows



## 10. Show top five actors/actresses that have participated in most movies.

In [None]:
df.withColumn('actor', f.explode(df.cast)).groupBy(f.col('actor')).count().sort('count', ascending=False).limit(5).show()

+----------------+-----+
|           actor|count|
+----------------+-----+
|    Harold Lloyd|  190|
|     Hoot Gibson|  142|
|      John Wayne|  136|
|Charles Starrett|  116|
|    Bebe Daniels|  103|
+----------------+-----+



# B. RDD exercises

## 1. Given a string s that include only alphabetical letters and spaces. Check whether s1 is a palindrome.

In [None]:
# palindrome là ngược giống xuôi.
s = "race car"
def isPalindrome(s):
  return s==s[::-1]

isPalindrome(s.replace(" ",""))

True

In [None]:
s = 'race car'
rdd = spark.sparkContext.parallelize(s).filter(lambda l: l!= ' ')
rdd.collect()

['r', 'a', 'c', 'e', 'c', 'a', 'r']

In [None]:
# a rdd for the orignal series of letters
index = spark.sparkContext.range(0, rdd.count())
rddForward = index.zip(rdd)
rddForward.collect()

[(0, 'r'), (1, 'a'), (2, 'c'), (3, 'e'), (4, 'c'), (5, 'a'), (6, 'r')]

In [None]:
rddBackward = rddForward.sortBy(lambda r:r[0]*-1)
rddBackward.collect()

[(6, 'r'), (5, 'a'), (4, 'c'), (3, 'e'), (2, 'c'), (1, 'a'), (0, 'r')]

In [None]:
rddCombined = rddForward.zip(rddBackward)
rddCombined.collect()

[((0, 'r'), (6, 'r')),
 ((1, 'a'), (5, 'a')),
 ((2, 'c'), (4, 'c')),
 ((3, 'e'), (3, 'e')),
 ((4, 'c'), (2, 'c')),
 ((5, 'a'), (1, 'a')),
 ((6, 'r'), (0, 'r'))]

In [None]:
rddCombined.filter(lambda r: r[0][1] != r[1][1]).count()

0

## 2. Given a string s that include only alphabetical letters and spaces. Check whether s1 is a pangram.

In [None]:
s = "The quick brown fox jumps over the lazy dog"
s1 = "The quick brown fox jumps over the dog"
def isPangram(s):

  rdd = spark.sparkContext.parallelize(s).filter(lambda l: l!= ' ')
  return rdd.distinct().count() ==26


isPangram(s.lower())

True

## 3. Given two strings, s1 and s2, that include only alphabetical letters and spaces. Check whether s1 is an anagram of s2

In [None]:
str1 = "listen" 
str2 = "silent"

def sortStr(str):
  return spark.sparkContext.parallelize(str).filter(lambda l: l!= ' ')\
         .sortBy(lambda x: x)

def isAnagram(str1, str2):
  rdd1 = sortStr(str1)
  rdd2 = sortStr(str2)
  rddCombined = rdd1.zip(rdd2)
  print(rddCombined.collect())
  return rddCombined.filter(lambda x: x[0] != x[1]).count() == 0

isAnagram(str1, str2) 

[('e', 'e'), ('i', 'i'), ('l', 'l'), ('n', 'n'), ('s', 's'), ('t', 't')]


True

# C. MLlib exercises

## Import libraries

In [None]:
# Import necessary libraries
from pyspark.sql.functions import col, column, expr
from pyspark.sql import functions as f

from pyspark.ml import Pipeline
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.clustering import KMeans

## Question 2: Consider the CSV file foodmart.csv, whose content represents a transactional dataset. Each record of the dataset is a tuple of values 1 and 0 corresponding to a designated list of items, in which 1 means bought and 0 means not bought.





In [None]:
# Load data
!git clone https://github.com/phatpham46/Spark_exercises.git

Cloning into 'Spark_exercises'...
remote: Enumerating objects: 10, done.[K
remote: Counting objects:  10% (1/10)[Kremote: Counting objects:  20% (2/10)[Kremote: Counting objects:  30% (3/10)[Kremote: Counting objects:  40% (4/10)[Kremote: Counting objects:  50% (5/10)[Kremote: Counting objects:  60% (6/10)[Kremote: Counting objects:  70% (7/10)[Kremote: Counting objects:  80% (8/10)[Kremote: Counting objects:  90% (9/10)[Kremote: Counting objects: 100% (10/10)[Kremote: Counting objects: 100% (10/10), done.[K
remote: Compressing objects: 100% (8/8), done.[K
remote: Total 10 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (10/10), 815.52 KiB | 2.83 MiB/s, done.


In [None]:
df_foodmart = spark.read.csv("Spark_exercises/data/foodmart.csv", header = True)
df_foodmart.show(10, truncate=True)

+------------+---------+-------+--------------+------+---------+----+-------+-------+------------+-----------------+------+------+-----+---------+---------------+-----+--------+------+-------------+------------------+-----------+-------+-----------+--------------+--------+----------+-----------+-----------+----+------+-----------+----------+----+-----------------+---------------+------------+-------------+----------+--------------+-----------------+---+---------+----------+--------------+--------+---------+---------+---+-----+-----+----------+----+----+---------+-------+------------+----+-------+-----------+--------+------------+-----------+-----+-------------+----------------+-----+----------------+-------+---------+------------+-------------+-------------+---------+--------+----+--------+------+------------+-------+---------+------+------------+----+----+----------+------+-------+----------------+-----+----------+----+--------------+-----+------------+----+---------+-------+----+----

### 2.1. Convert the given dataset to the following format. Note that in each list of items, consecutive items are separated by a single comma.
- ID      Items
- 1       item1, item2, item3
- 2       item3, item1,
- …       …

In [None]:
from pyspark.sql.types import StructType, StructField, ArrayType, StringType

columns = df_foodmart.columns

def mapping(x):
  ret = []
  for col in columns:
    if(x[col] == '1'):
      ret.append(col)
  return ret

rdd = df_foodmart.rdd.map(mapping).collect()

rdd = zip(range(1, len(rdd)+1), rdd)
ret = spark.createDataFrame(rdd,['id', 'item'])
ret.show(truncate=False)

+---+---------------------------------------------------------------------+
|id |item                                                                 |
+---+---------------------------------------------------------------------+
|1  |[Acetominifen, Cheese, Home Magazines, Shampoo]                      |
|2  |[Acetominifen, Cheese, Hard Candy, Milk, Pot Scrubbers, Rice]        |
|3  |[Coffee, Deli Salads]                                                |
|4  |[Eggs, Gum, Milk, Soup]                                              |
|5  |[Cheese, Dried Fruit, Frozen Chicken, Plastic Utensils]              |
|6  |[Shampoo]                                                            |
|7  |[Milk, Paper Wipes, Waffles]                                         |
|8  |[Donuts, Dried Fruit, Frozen Chicken]                                |
|9  |[Cooking Oil, Hamburger, Maps, Popsicles]                            |
|10 |[Cheese, Cooking Oil, Dips, Preserves, TV Dinner]                    |
|11 |[Nasal 

### 2.2. Mine the set of frequent patterns and the set of association rules from the above dataset (in new format) with min support of 0.1 and min confidence of 0.9.


In [None]:
from pyspark.ml.fpm import FPGrowth

fpg = FPGrowth(
   itemsCol = 'item',
   minSupport=0.1,
   minConfidence=0.9 
).fit(ret)

patterns = fpg.freqItemsets
rules = fpg.associationRules
patterns.sort("items").show(10)
rules.sort('antecedent', 'confidence').show()

+-------------+----+
|        items|freq|
+-------------+----+
|     [Cheese]| 285|
|    [Cookies]| 238|
|[Dried Fruit]| 256|
|       [Soup]| 280|
+-------------+----+

+----------+----------+----------+----+-------+
|antecedent|consequent|confidence|lift|support|
+----------+----------+----------+----+-------+
+----------+----------+----------+----+-------+



## Question3. Consider the CSV file mushrooms.csv, whose content represents a dataset of mushroom species. There are 8124 examples, each of which is presented by 22 attributes and categorized into either “edible” (e) or “poisonous” (p) 


In [None]:
mushroom_df = spark.read.csv("Spark_exercises/data/mushrooms.csv", header = True)
mushroom_df.show(10)

+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+
|class|cap-shape|cap-surface|cap-color|bruises|odor|gill-attachment|gill-spacing|gill-size|gill-color|stalk-shape|stalk-root|stalk-surface-above-ring|stalk-surface-below-ring|stalk-color-above-ring|stalk-color-below-ring|veil-type|veil-color|ring-number|ring-type|spore-print-color|population|habitat|
+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+
|    p|        x|          s|        n|      t|   p|              f|           c|        n|   

In [None]:
from prompt_toolkit import output
def evaluate(pred, metric):
  evaluator = MulticlassClassificationEvaluator(
      predictionCol = 'prediction',
      labelCol = 'class_index',
      metricName = metric
  )
  return round(evaluator.evaluate(pred),5)

train, test = mushroom_df.randomSplit([.8,.2])

columns = mushroom_df.columns

indexed_col = [c+"_index" for c in columns]
features_col = indexed_col.copy()
features_col.remove("class_index")

indexer = StringIndexer(
    inputCols = columns,
    outputCols = indexed_col
)

vectorizer = VectorAssembler(
    inputCols = features_col, 
    outputCol = 'features'
)

model1 = DecisionTreeClassifier(
    labelCol = 'class_index',
    featuresCol = 'features'
)

model2 = RandomForestClassifier(
    labelCol = 'class_index',
    featuresCol = 'features'
)

In [None]:
for model in [model1, model2]:
  pipeline = Pipeline(stages=[indexer, vectorizer, model]).fit(train)
  prediction = pipeline.transform(test)
  print("Acc: ", evaluate(prediction, 'accuracy'))

Acc:  0.99939
Acc:  1.0


## Question 4. Consider the CSV file iris.csv, whose content represents a dataset of iris plant species. There are 150 examples, each of which is presented by 4 attributes and categorized into one of the three classes. 


In [None]:
iris_df = spark.read.csv("Spark_exercises/data/iris.csv", header = True)
iris_df.show(10)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
+---+-------------+------------+-------------+------------+-----

In [None]:
columns = [
    'SepalLengthCm',
    'SepalWidthCm',
    'PetalLengthCm',
    'PetalWidthCm'
]

indexed_cols = [c+'_index' for c in columns]

indexer = StringIndexer(
    inputCols=columns,
    outputCols=indexed_cols
)

vectorizer = VectorAssembler(
    inputCols=indexed_cols,
    outputCol='features'
)

In [None]:
def clustering(data, nCluster):
  kmeans=KMeans(
      k=nCluster
  )
  pipeline=Pipeline(stages=[indexer, vectorizer, kmeans]).fit(data)
  prediction = pipeline.transform(data)
  return prediction

predictions =[]
for nCluster in [2,3,4]:
  predictions.append(clustering(iris_df, nCluster))
  predictions[-1].select('features', 'prediction').show(3)

+------------------+----------+
|          features|prediction|
+------------------+----------+
| [1.0,9.0,1.0,0.0]|         1|
| [8.0,0.0,1.0,0.0]|         1|
|[24.0,2.0,4.0,0.0]|         1|
+------------------+----------+
only showing top 3 rows

+------------------+----------+
|          features|prediction|
+------------------+----------+
| [1.0,9.0,1.0,0.0]|         1|
| [8.0,0.0,1.0,0.0]|         2|
|[24.0,2.0,4.0,0.0]|         2|
+------------------+----------+
only showing top 3 rows

+------------------+----------+
|          features|prediction|
+------------------+----------+
| [1.0,9.0,1.0,0.0]|         1|
| [8.0,0.0,1.0,0.0]|         0|
|[24.0,2.0,4.0,0.0]|         0|
+------------------+----------+
only showing top 3 rows



In [None]:
def countClusterSamples(df):
  return df.groupBy('prediction').count().orderBy('prediction')

countClusterSamples(predictions[0]).show()

+----------+-----+
|prediction|count|
+----------+-----+
|         0|   47|
|         1|  103|
+----------+-----+



In [None]:
for p in predictions:
  countClusterSamples(p).show()

+----------+-----+
|prediction|count|
+----------+-----+
|         0|   47|
|         1|  103|
+----------+-----+

+----------+-----+
|prediction|count|
+----------+-----+
|         0|   44|
|         1|   64|
|         2|   42|
+----------+-----+

+----------+-----+
|prediction|count|
+----------+-----+
|         0|   44|
|         1|   63|
|         2|   22|
|         3|   21|
+----------+-----+

