In [1]:
sc


<pyspark.context.SparkContext at 0x1113eb358>

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.parquet("tweets.consolidated.parquet")

In [4]:
df.show()


+----------+------------------+--------------------+--------------------+--------+
|      user|                id|                text|            location|hasMedia|
+----------+------------------+--------------------+--------------------+--------+
| 429803867|668129332066459648|e0b40f2381c430f6d...|[27.166142,73.852...|   false|
|2575662781|668129436932415488|:) https://t.co/r...|[19.5371016,-96.9...|    true|
|2558754024|668128681945092096|برد 😊 (@ miral -...|[29.10425394,48.1...|   false|
| 175196235|668128627406610432|christmas market:...|[43.6506691,-79.3...|   false|
| 737480838|668128627394019328|يا عزيزي يالمدريد...|[26.21390031,50.4...|   false|
|  22921151|668129030068166657|#noelgeek #ghostb...|[45.50757496,-73....|   false|
|  93448793|668129332041265152|Soooooo these #ne...|[38.72750195,-90....|   false|
| 959736212|668128937801682945|Green Turtle in W...|[39.5640488,-76.9...|   false|
|  59972446|668129025890455552|#Retail #Job in #...|[41.4517093,-82.0...|   false|
|3234

In [6]:
df.printSchema()

root
 |-- user: long (nullable = true)
 |-- id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- hasMedia: boolean (nullable = true)



In [8]:
df = df.cache()

In [9]:
df.groupBy("hasMedia").count().show()

+--------+-----+
|hasMedia|count|
+--------+-----+
|    true|  118|
|   false| 1967|
+--------+-----+



In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils
from pyspark.sql import Row
from pyspark.ml.feature import Tokenizer

In [13]:
def boolToInt(val):
    if val:
        return 1.0
    else:
        return 0.0

ml_df = sqlContext.createDataFrame(df.map(lambda r : Row(id=r.id, text=r.text, label=boolToInt(r.hasMedia))).collect())



In [14]:
ml_df.show()

+------------------+-----+--------------------+
|                id|label|                text|
+------------------+-----+--------------------+
|668129332066459648|  0.0|e0b40f2381c430f6d...|
|668129436932415488|  1.0|:) https://t.co/r...|
|668128681945092096|  0.0|برد 😊 (@ miral -...|
|668128627406610432|  0.0|christmas market:...|
|668128627394019328|  0.0|يا عزيزي يالمدريد...|
|668129030068166657|  0.0|#noelgeek #ghostb...|
|668129332041265152|  0.0|Soooooo these #ne...|
|668128937801682945|  0.0|Green Turtle in W...|
|668129025890455552|  0.0|#Retail #Job in #...|
|668129269160222720|  0.0|#StaracArabia
الن...|
|668128677763379201|  0.0|Açlık oyunları al...|
|668128749032902656|  0.0|#beaurivagegolf #...|
|668129231386333185|  0.0|@bm0406 @ionacrv ...|
|668129369819357184|  0.0|#Bilinmezlik @ İz...|
|668128673560592384|  0.0|Razón tenía aquel...|
|668129055238062081|  0.0|349.336 personas ...|
|668128820365512704|  0.0|🎉🎉🎉 @ Quilmes,...|
|668129436898885633|  0.0|Risottinho de moq.

In [15]:
ml_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- label: double (nullable = true)
 |-- text: string (nullable = true)



In [27]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
ml_with_words_df = tokenizer.transform(ml_df).drop("text")
ml_with_word_df = ml_with_words_df.flatMap(lambda r: [Row(id=r.id,label=r.label,word=w) for w in r.words]).toDF()

In [28]:
ml_with_word_df.show()

+------------------+-----+--------------------+
|                id|label|                word|
+------------------+-----+--------------------+
|668129332066459648|  0.0|e0b40f2381c430f6d...|
|668129436932415488|  1.0|                  :)|
|668129436932415488|  1.0|https://t.co/riry...|
|668128681945092096|  0.0|                 برد|
|668128681945092096|  0.0|                  😊|
|668128681945092096|  0.0|                  (@|
|668128681945092096|  0.0|               miral|
|668128681945092096|  0.0|                   -|
|668128681945092096|  0.0|               ميرال|
|668128681945092096|  0.0|                  in|
|668128681945092096|  0.0|             kuwait)|
|668128681945092096|  0.0|https://t.co/yfno...|
|668128627406610432|  0.0|           christmas|
|668128627406610432|  0.0|             market:|
|668128627406610432|  0.0|                that|
|668128627406610432|  0.0|                time|
|668128627406610432|  0.0|                  of|
|668128627406610432|  0.0|               

In [29]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(ml_with_word_df)
wordIndexer = StringIndexer(inputCol="word", outputCol="indexedWord").fit(ml_with_word_df)



In [31]:

training, test = ml_with_word_df.randomSplit((0.7, 0.3), seed = 1)

In [32]:
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedWord")


In [33]:
pipeline = Pipeline(stages=[labelIndexer, wordIndexer, dt])


In [35]:
model = pipeline.fit(ml_with_word_df)

IllegalArgumentException: requirement failed: Column indexedWord must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually DoubleType.

In [90]:
training.show()

+------------------+-----+--------------------+
|                id|label|                text|
+------------------+-----+--------------------+
|668129332066459648|  0.0|e0b40f2381c430f6d...|
|668128681945092096|  0.0|برد 😊 (@ miral -...|
|668128627406610432|  0.0|christmas market:...|
|668129030068166657|  0.0|#noelgeek #ghostb...|
|668128937801682945|  0.0|Green Turtle in W...|
|668129025890455552|  0.0|#Retail #Job in #...|
|668129269160222720|  0.0|#StaracArabia
الن...|
|668128749032902656|  0.0|#beaurivagegolf #...|
|668129231386333185|  0.0|@bm0406 @ionacrv ...|
|668129369819357184|  0.0|#Bilinmezlik @ İz...|
|668128673560592384|  0.0|Razón tenía aquel...|
|668129055238062081|  0.0|349.336 personas ...|
|668128820365512704|  0.0|🎉🎉🎉 @ Quilmes,...|
|668129436898885633|  0.0|Risottinho de moq...|
|668128317044826113|  0.0|fish bowl fridays...|
|668128518354698242|  0.0|Viendo el partido...|
|667706185517240320|  0.0|Catch The Sooo Se...|
|667705011103621120|  0.0|We're #hiring! Cl.

In [91]:
test.show()

+------------------+-----+--------------------+
|                id|label|                text|
+------------------+-----+--------------------+
|668129436932415488|  1.0|:) https://t.co/r...|
|668128627394019328|  0.0|يا عزيزي يالمدريد...|
|668129332041265152|  0.0|Soooooo these #ne...|
|668128677763379201|  0.0|Açlık oyunları al...|
|667705069832306688|  0.0|@PenyukaAnisa wau...|
|668130883946188800|  1.0|See a virtual tou...|
|668131290781081600|  1.0|HAPPY BIRTHDAY BB...|
|668131672475492352|  0.0|I'm at CineRitz f...|
|668131018172317697|  0.0|TRAFFIC STOP at S...|
|668131919943458817|  0.0|SLS AMG///  😍 @ ...|
|668126962271911936|  0.0|Seng ulang tahunn...|
|668127545284497408|  0.0|@lndsm101 kkkkk n...|
|668380373722660864|  0.0|Mari makan 😋 (at...|
|668380608616382464|  0.0|I'm at happy trai...|
|668380835104489472|  0.0|Hehe..thx for tod...|
|668130045106475010|  0.0|Ver la @premierle...|
|668130430961479680|  0.0|• TIME ♡ FLIES •
...|
|668052060399603713|  0.0|Хорошие зонтики,

In [92]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [93]:
model = pipeline.fit(training)

In [94]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "label", "prediction")

In [95]:
selected.show()

+------------------+--------------------+-----+----------+
|                id|                text|label|prediction|
+------------------+--------------------+-----+----------+
|668129436932415488|:) https://t.co/r...|  1.0|       0.0|
|668128627394019328|يا عزيزي يالمدريد...|  0.0|       0.0|
|668129332041265152|Soooooo these #ne...|  0.0|       0.0|
|668128677763379201|Açlık oyunları al...|  0.0|       0.0|
|667705069832306688|@PenyukaAnisa wau...|  0.0|       0.0|
|668130883946188800|See a virtual tou...|  1.0|       0.0|
|668131290781081600|HAPPY BIRTHDAY BB...|  1.0|       0.0|
|668131672475492352|I'm at CineRitz f...|  0.0|       0.0|
|668131018172317697|TRAFFIC STOP at S...|  0.0|       0.0|
|668131919943458817|SLS AMG///  😍 @ ...|  0.0|       0.0|
|668126962271911936|Seng ulang tahunn...|  0.0|       0.0|
|668127545284497408|@lndsm101 kkkkk n...|  0.0|       0.0|
|668380373722660864|Mari makan 😋 (at...|  0.0|       0.0|
|668380608616382464|I'm at happy trai...|  0.0|       0.0|