## KMeans tutorial for Module 5 Lesson 9:
### clustering the twitter data at HdiSamples

### - Refer the Scala libraries for KMeans clustering

In [227]:
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.KMeans

import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._

### (1) Read the input data from example data set and cache it to memory

In [228]:
val dataAllDF = sqlContext.read.json("wasb:////HdiSamples/HdiSamples/TwitterTrendsSampleData/tweets.txt").cache()

dataAllDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: struct<coordinates:array<double>,type:string>, created_at: string, entities: struct<hashtags:array<struct<indices:array<bigint>,text:string>>,media:array<struct<display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,type:string,url:string>>,symbols:array<string>,trends:array<string>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mentions:array<...

In [229]:
dataDF.show(2)

+------------+-----------+--------------------+--------------------+-----------------+--------------+---------+------------+----+------------------+------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+----+-----+------------------+-------------+---------+----------------+--------------------+--------------------+-------------+---------+--------------------+
|contributors|coordinates|          created_at|            entities|extended_entities|favorite_count|favorited|filter_level| geo|                id|            id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|lang|place|possibly_sensitive|retweet_count|retweeted|retweeted_status|              source|                text| timestamp_ms|truncated|                user|
+------------+-----------+--------------------+--------------------+-----------------+--------------+---------+-----

In [230]:
dataDF.columns

res197: Array[String] = Array(contributors, coordinates, created_at, entities, extended_entities, favorite_count, favorited, filter_level, geo, id, id_str, in_reply_to_screen_name, in_reply_to_status_id, in_reply_to_status_id_str, in_reply_to_user_id, in_reply_to_user_id_str, lang, place, possibly_sensitive, retweet_count, retweeted, retweeted_status, source, text, timestamp_ms, truncated, user)

### (2) lower the text column of the data

In [231]:
val dataLoweredDF = dataDF.select($"*", lower($"text").as("lowerText"))

dataLoweredDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: struct<coordinates:array<double>,type:string>, created_at: string, entities: struct<hashtags:array<struct<indices:array<bigint>,text:string>>,media:array<struct<display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,type:string,url:string>>,symbols:array<string>,trends:array<string>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mentions:ar...

### (3) convert the sentence in lower letters to words using tokenizer

In [232]:
import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, HashingTF, IDF, Normalizer}

val numClusters = 10
val numFeatures = 2000


val tokenizer = new RegexTokenizer().setInputCol("lowerText").setOutputCol("words").setPattern("\\W+")

tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_69eb7c500583

In [233]:
val dataWordsDF = tokenizer.transform(dataLoweredDF)

dataWordsDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: struct<coordinates:array<double>,type:string>, created_at: string, entities: struct<hashtags:array<struct<indices:array<bigint>,text:string>>,media:array<struct<display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,type:string,url:string>>,symbols:array<string>,trends:array<string>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mentions:arra...

In [234]:
dataWordsDF.printSchema

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true

In [235]:
dataWordsDF.select("words").first

res202: org.apache.spark.sql.Row = [WrappedArray(tresorit, cloud, storage, get, an, extra, 500mb, free, webspace, when, you, sign, up, with, my, http, t, co, nln3x3o7ov, http, t, co, els1ultvg5)]

### (4) Remove the stopwords using StopWordsRemover

In [236]:
val remover = new StopWordsRemover().setInputCol("words").setOutputCol("noStopWords")

remover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_d3bc369fbe04

In [237]:
val noStopWordsListDF = remover.transform(dataWordsDF)

noStopWordsListDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: struct<coordinates:array<double>,type:string>, created_at: string, entities: struct<hashtags:array<struct<indices:array<bigint>,text:string>>,media:array<struct<display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,type:string,url:string>>,symbols:array<string>,trends:array<string>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mention...

In [238]:
noStopWordsListDF.printSchema

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true

In [239]:
noStopWordsListDF.select("id", "words", "noStopWords").show(5)

+------------------+--------------------+--------------------+
|                id|               words|         noStopWords|
+------------------+--------------------+--------------------+
|537350547683434497|[tresorit, cloud,...|[tresorit, cloud,...|
|537350549692510208|[check, out, my, ...|[check, suite, ju...|
|537350568231337985|[what, s, on, the...|[s, horizon, clou...|
|537350570513014784|[onlynashy, super...|[onlynashy, super...|
|537350572576620544|[smoothhemmo, cc,...|[smoothhemmo, cc,...|
+------------------+--------------------+--------------------+
only showing top 5 rows

### (5) Calculate TF (Term-Frequency) and set number of features

In [240]:
val hashingTF = new HashingTF().setInputCol("noStopWords").setOutputCol("hashingTF").setNumFeatures(numFeatures)
val featurizedDataDF = hashingTF.transform(noStopWordsListDF)

featurizedDataDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: struct<coordinates:array<double>,type:string>, created_at: string, entities: struct<hashtags:array<struct<indices:array<bigint>,text:string>>,media:array<struct<display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,type:string,url:string>>,symbols:array<string>,trends:array<string>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mentions...

In [241]:
featurizedDataDF.printSchema

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true

In [242]:
featurizedDataDF.select("id", "noStopWords", "hashingTF").show(3)

+------------------+--------------------+--------------------+
|                id|         noStopWords|           hashingTF|
+------------------+--------------------+--------------------+
|537350547683434497|[tresorit, cloud,...|(2000,[116,157,17...|
|537350549692510208|[check, suite, ju...|(2000,[116,203,39...|
|537350568231337985|[s, horizon, clou...|(2000,[71,115,116...|
+------------------+--------------------+--------------------+
only showing top 3 rows

### (6) Calculate IDF (Inverse Document Frequency) and normalizing it to the even scale

In [243]:
val normalizer = new Normalizer().setInputCol("idf").setOutputCol("features")

normalizer: org.apache.spark.ml.feature.Normalizer = normalizer_31c2d11095cb

In [244]:
val idf = new IDF().setInputCol("hashingTF").setOutputCol("idf")
val idfModel = idf.fit(featurizedDataDF)

idfModel: org.apache.spark.ml.feature.IDFModel = idf_0c174db57c74

### (7) Set KMeans model with the features and prediction column
#### a. Build pipeline with Tokenizer, Remover, HashingTF, IDF, Normalizer, KMeans model

In [245]:
val kmeans = new KMeans().setFeaturesCol("features").setPredictionCol("prediction").setK(numClusters).setSeed(0) 
 
val pipeline = new Pipeline().setStages(Array(tokenizer, remover, hashingTF, idf, normalizer, kmeans))  

pipeline: org.apache.spark.ml.Pipeline = pipeline_3b57a99e587c

#### b. Train the model with dataLoweredDF dataset

In [246]:
val model = pipeline.fit(dataLoweredDF)

model: org.apache.spark.ml.PipelineModel = pipeline_3b57a99e587c

### (8) Run the test data set: 
#### we train and test the model with the same data set. But, test data set should be another data set 
#### in order to check out the accuracy of the model.

In [247]:
val predictionsDF = model.transform(dataLoweredDF)

predictionsDF: org.apache.spark.sql.DataFrame = [contributors: string, coordinates: struct<coordinates:array<double>,type:string>, created_at: string, entities: struct<hashtags:array<struct<indices:array<bigint>,text:string>>,media:array<struct<display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,type:string,url:string>>,symbols:array<string>,trends:array<string>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mentions:ar...

#### You will see the total K groups and how many data set for each group.

In [248]:
predictionsDF.groupBy("prediction").count().show(numClusters)

+----------+-----+
|prediction|count|
+----------+-----+
|         0|    7|
|         1|   11|
|         2|    6|
|         3|   15|
|         4|   11|
|         5|  103|
|         6|   40|
|         7|    1|
|         8|    4|
|         9|    2|
+----------+-----+

### (8) Run SQL with the table in memory
#### a. create a table and name it "predictionsDF"

In [249]:
predictionsDF.registerTempTable("predictionsDF")


#### b. run SQL to see what hashtags exist for group 0 (prediction = 0, which has cloud, Blogs, ....)

In [250]:
%sql
select entities.hashtags[0].text, entities.hashtags[1].text, entities.hashtags[2].text, 
entities.hashtags[3].text, prediction from predictionsDF 
where prediction = 0 AND 
entities.hashtags[0].text IS NOT NULL AND 
entities.hashtags[1].text IS NOT NULL AND 
entities.hashtags[2].text IS NOT NULL AND 
entities.hashtags[3].text IS NOT NULL 
limit 30

#### c. run SQL to see what hashtags exist for group 5 (prediction = 5, which has more general words such associal, big data, Cloud, ....)

In [251]:
%sql
select entities.hashtags[0].text, entities.hashtags[1].text, entities.hashtags[2].text, 
entities.hashtags[3].text, prediction from predictionsDF 
where prediction = 5 AND 
entities.hashtags[0].text IS NOT NULL AND 
entities.hashtags[1].text IS NOT NULL AND 
entities.hashtags[2].text IS NOT NULL AND 
entities.hashtags[3].text IS NOT NULL 
limit 30

#### d. run SQL to see what hashtags exist for group 3 (prediction = 3, which has cloud, Big Data, IoT, ....)

In [252]:
%sql
select entities.hashtags[0].text, entities.hashtags[1].text, entities.hashtags[2].text, 
entities.hashtags[3].text, prediction from predictionsDF 
where prediction = 3 AND 
entities.hashtags[0].text IS NOT NULL AND 
entities.hashtags[1].text IS NOT NULL AND 
entities.hashtags[2].text IS NOT NULL AND 
entities.hashtags[3].text IS NOT NULL 
limit 30

#### e. run SQL to see what hashtags exist for group 4 (prediction = 4, which has autos ,Photography, ....)

In [253]:
%sql
select entities.hashtags[0].text, entities.hashtags[1].text, entities.hashtags[2].text, 
entities.hashtags[3].text, prediction from predictionsDF 
where prediction = 4 AND 
entities.hashtags[0].text IS NOT NULL AND 
entities.hashtags[1].text IS NOT NULL AND 
entities.hashtags[2].text IS NOT NULL AND 
entities.hashtags[3].text IS NOT NULL 
limit 30

#### f. run SQL to see what hashtags exist for group 6 (prediction = 6, which does not have any hashtags)

In [254]:
%sql
select entities.hashtags[0].text, entities.hashtags[1].text, entities.hashtags[2].text, 
entities.hashtags[3].text, prediction from predictionsDF 
where prediction = 6 AND 
entities.hashtags[0].text IS NOT NULL AND 
entities.hashtags[1].text IS NOT NULL AND 
entities.hashtags[2].text IS NOT NULL AND 
entities.hashtags[3].text IS NOT NULL 
limit 30