In [18]:
from __future__ import print_function
from pyspark.sql.functions import count, countDistinct
from pyspark.sql.functions import col
from pyspark.sql.functions import explode, expr
from pyspark.sql.functions import split
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark import SparkContext
from pyspark.sql import SQLContext
import os

spark = SparkSession.builder.getOrCreate()



In [2]:
#Load tweets as Dataframe to Spark
tweetsDF = spark.read.json("file:///Users/Laith/Downloads/features_tweets.json")

In [3]:
tweetsDF.printSchema()

root
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- followers_count: long (nullable = true)
 |-- friends_count: long (nullable = true)
 |-- location: string (nullable = true)
 |-- reply_to: long (nullable = true)
 |-- userId: long (nullable = true)



In [4]:
from pyspark.ml.feature import Word2Vec

In [9]:
word2Vec = Word2Vec(vectorSize=5, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(tweetsDF)

result = model.transform(tweetsDF)
result.select("userId", "features").show(20)
#for row in result.collect():
 #   text, vector = row
  #  print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

+-------------------+-----------------------------------------------------------------------------------------------------------+
|userId             |features                                                                                                   |
+-------------------+-----------------------------------------------------------------------------------------------------------+
|351883595          |[-0.03009021282196045,0.06259085237979889,-0.013886237516999245,0.009944629855453968,-0.0935233011841774]  |
|744537718043086849 |[-0.0074743957569201784,-0.24438818792502084,-0.05768536062290271,0.12713951617479324,0.15674133703578264] |
|112500296          |[0.2038060774405797,-0.6891342749198277,-0.3379166626061002,0.37206344678997993,0.38107080198824406]       |
|1185604978964127756|[-0.08905501441600232,-0.07964820357469413,0.05289482769484704,-0.05341297388076782,-0.0340033447674404]   |
|57025876           |[-0.18352871189514797,0.07425685014265279,0.3889609184116125,0.060544

In [16]:
trainingData, testData = result.randomSplit([0.7, 0.3])

root
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- followers_count: long (nullable = true)
 |-- friends_count: long (nullable = true)
 |-- location: string (nullable = true)
 |-- reply_to: long (nullable = true)
 |-- userId: long (nullable = true)
 |-- features: vector (nullable = true)



In [19]:
#Visualize tweeps based on followers and friends count, and Location/UserId
# Trains a k-means model.
k = 20
kmeans = KMeans().setK(k).setSeed(1)

model = kmeans.fit(trainingData.cache())

In [29]:
cerror = model.computeCost(trainingData.cache())

In [30]:
cerror

131.2054841304752

In [31]:
centers = model.clusterCenters()
for center in centers:
    print(center)

[-0.07150275 -0.17194918  0.04659408  0.05770173 -0.00396389]
[0.00559446 0.77511828 1.38770373 1.20914758 0.09485598]
[-0.04245581 -0.22914754 -0.07511988  0.08067213  0.12272565]
[ 1.60483269  0.27585686  0.67852375 -0.99484299 -0.1510577 ]
[ 0.25763614 -0.54231242 -0.40360131  0.98155428 -0.14858594]
[-0.09830807 -0.0661948   0.01673579 -0.02976313 -0.0135028 ]
[ 0.10707676  0.17490742  0.57197941 -0.11332628 -0.5557917 ]
[-0.17061013  0.03550776  0.28362998 -0.07652544 -0.58674303]
[ 0.20589342 -0.68116779 -0.33991836  0.37013597  0.37473367]
[-0.1231133   0.04425379  0.61154179  0.25686867 -0.22792611]
[ 1.17929411 -0.38807388  1.05750075 -0.21984535  0.45583078]
[-0.42193859  0.26182228  0.02756653  0.29015718 -0.39753369]
[-0.16014238 -0.07813271  0.14791121  0.0197737  -0.13889105]
[ 0.05955398 -0.3565019  -0.13853317  0.38813906 -0.09313745]
[ 0.07652997 -0.0716283   0.23370191 -0.14173356 -0.09366686]
[ 0.57687105 -0.36235036 -0.05804192  0.43765619  0.04512348]
[-0.23319692 

In [32]:
predictions = model.transform(testData.cache())
predictions.groupBy("prediction").count().sort(col("count").desc()).show()



+----------+-----+
|prediction|count|
+----------+-----+
|         0|  479|
|         5|  382|
|        12|  349|
|         2|  274|
|        14|  101|
|         3|   51|
|         9|   49|
|        13|   38|
|         7|   31|
|        16|   29|
|        10|   29|
|         8|   23|
|         4|   23|
|         1|   22|
|        18|   22|
|         6|   18|
|        15|   18|
|        19|   17|
|        11|   10|
|        17|    7|
+----------+-----+



In [34]:
from wordcloud import WordCloud, STOPWORDS 
import matplotlib.pyplot as plt 
import pandas as pd 

In [36]:
df = predictions.select("*").where("prediction == 0 OR prediction == 5 OR prediction == 12 OR prediction == 2")
df.printSchema()

root
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- followers_count: long (nullable = true)
 |-- friends_count: long (nullable = true)
 |-- location: string (nullable = true)
 |-- reply_to: long (nullable = true)
 |-- userId: long (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



In [59]:
df.coalesce(1).write.json("file:///Users/Laith/Downloads/wordcloud.json")

In [55]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(df)
  



In [56]:

featurizedData.printSchema()

root
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- followers_count: long (nullable = true)
 |-- friends_count: long (nullable = true)
 |-- location: string (nullable = true)
 |-- reply_to: long (nullable = true)
 |-- userId: long (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)
 |-- rawFeatures: vector (nullable = true)



In [57]:
featurizedData.select("rawFeatures").show(20, False)

+--------------------------------------------------------------------------------------+
|rawFeatures                                                                           |
+--------------------------------------------------------------------------------------+
|(20,[],[])                                                                            |
|(20,[],[])                                                                            |
|(20,[],[])                                                                            |
|(20,[],[])                                                                            |
|(20,[],[])                                                                            |
|(20,[3,4,10,12,15,16,18,19],[3.0,1.0,2.0,1.0,3.0,1.0,2.0,2.0])                        |
|(20,[2,6,7,8,13,14,15,16,17,18,19],[2.0,1.0,2.0,1.0,2.0,1.0,2.0,1.0,2.0,2.0,1.0])     |
|(20,[4,11,12,13,14,18,19],[1.0,1.0,2.0,1.0,1.0,3.0,2.0])                              |
|(20,[4,5,6,19],[2.0,