# Reddit Comment Clustering Using TF-IDF
<img src="https://raw.githubusercontent.com/rosswlewis/RedditComments/master/reddit_log-100011890-large.jpg" align="left" width="35%">

# Let's add a postgres driver and import all of the spark libraries we need

In [1]:
%Addjar -f https://jdbc.postgresql.org/download/postgresql-9.4.1207.jre7.jar

Starting download from https://jdbc.postgresql.org/download/postgresql-9.4.1207.jre7.jar
Finished download of postgresql-9.4.1207.jre7.jar


In [1]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, HashingTF, IDF, Normalizer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.PipelineModel

val sqlContext = new SQLContext(sc)

# Here, we read in the data from 

In [5]:
val uname = "readonly"
val pword = "watson"
val dbUrl = 
"jdbc:postgresql://haproxy635.sl-us-dal-9-portal.2.dblayer.com:10635/compose?user="+uname+"&password="+pword
val table = "reddit"
val comments = sqlContext.read.format("jdbc").options(
  Map("url" -> dbUrl,
  "dbtable" -> table)).load()

# To clean the data, we only use comments that haven't been deleted, are over 100 characters long, and we remove punctuation

In [6]:
val commentLower = comments.filter("length(body) >= 100").
    select(lower(comments("body")).alias("lowerText")).distinct()

In [5]:
val tokenizer = new RegexTokenizer().
    setInputCol("lowerText").
    setOutputCol("words").
    setPattern("\\W+")

# There are a lot of words that we don't want to have an affect on clusters.  Here, we add a stop words remover to our pipeline. 

In [6]:
val additionalWords = Array("b","c","d","e","f","g","h","j","k","l","m","n","o","p",
                            "q","r","s","t","u","v","w","x","y","z",
                            "0","1","2","3","4","5","6","7","8","9","10",
                            "oh","wow","stuff","thank","isn","don","didn","people",
                            "thing","ve","time","know","think")
val remover = new StopWordsRemover().setInputCol("words").setOutputCol("noStopWords")
val currentStopWords = remover.getStopWords
val allStopWords = currentStopWords ++ additionalWords
remover.setStopWords(allStopWords)

stopWords_f68dff9a0d7c

# Add a column which hashes each individual word to an integer, and mark the frequency of that word in the comment

In [7]:
val hashingTF = new HashingTF().setInputCol("noStopWords").setOutputCol("hashingTF").setNumFeatures(25000)

# Add a column which looks at all words in all comments and ranks the importance

In [8]:
val idf = new IDF().setInputCol("hashingTF").setOutputCol("idf")

# Normalize the output

In [9]:
val normalizer = new Normalizer().
    setInputCol("idf").
    setOutputCol("features")

# Based on the important words of different comments, create a pipeline to cluster the comments by topic

In [10]:
val kmeans = new KMeans().
    setFeaturesCol("features").
    setPredictionCol("prediction").
    setK(250)
val pipeline = new Pipeline().
    setStages(Array(tokenizer, remover, hashingTF, idf, normalizer, kmeans))
val model = pipeline.fit(commentLower)

In [7]:
val predictionsDF = model.transform(commentLower)

In [1]:
println(predictionsDF.count())

1546943


<img src="https://raw.githubusercontent.com/rosswlewis/RedditComments/master/TFIDF-FIG-01.JPG" align="left">

# Save the model and labeled data for future use

In [None]:
var credentials = scala.collection.mutable.HashMap[String, String](
  "auth_url"->"https://identity.open.softlayer.com",
  "project"->"XXXXXXXX",
  "project_id"->"XXXXXXXX",
  "region"->"dallas",
  "user_id"->"XXXXXXXX",
  "domain_id"->"XXXXXXXX",
  "domain_name"->"XXXXXXXX",
  "username"->"XXXXXXXX",
  "password"->"""XXXXXXXX""",
  "container"->"XXXXXXXX",
  "tenantId"->"XXXXXXXX",
  "name"->"keystone"
)
def setHadoopConfig(name: String, tenant: String, url: String, 
                    username: String, password: String, region: String) = {
    sc.hadoopConfiguration.set(f"fs.swift.service.$name.auth.url",url+"/v3/auth/tokens")
    sc.hadoopConfiguration.set(f"fs.swift.service.$name.auth.endpoint.prefix","endpoints")
    sc.hadoopConfiguration.set(f"fs.swift.service.$name.tenant",tenant)
    sc.hadoopConfiguration.set(f"fs.swift.service.$name.username",username)
    sc.hadoopConfiguration.set(f"fs.swift.service.$name.password",password)
    sc.hadoopConfiguration.setInt(f"fs.swift.service.$name.http.port",8080)
    sc.hadoopConfiguration.set(f"fs.swift.service.$name.region",region)
    sc.hadoopConfiguration.setBoolean(f"fs.swift.service.$name.public",true)
}
setHadoopConfig(credentials("name"),credentials("project_id"),credentials("auth_url"),
                          credentials("user_id"),credentials("password"),credentials("region"))
//sc.parallelize(Seq(model), 1).saveAsObjectFile("swift://XXXXXXXX.keystone/model")
//predictionsDF.select("noStopWords","lowerText","prediction").write.parquet("swift://XXXXXXXX.keystone/commentClusters")

In [3]:
val model = sc.objectFile[PipelineModel]("swift://XXXXXXXX.keystone/model").first()

In [4]:
model

pipeline_8ac1f844e0a4

# Create a new comment and assign a cluster to it

In [4]:
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
import sqlContext.implicits._

val userInput = "With my free time, I like to play video games, drum, and do big data analytics."
case class comment(lowerText: String)
val commentDataFrame = sc.parallelize(Seq(comment(userInput))).toDF()

val curCom = model.transform(commentDataFrame)

In [5]:
curCom.select("lowerText","prediction").collect()

Array([With my free time, I like to play video games, drum, and do big data analytics.,185])