In [2]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("jupyter_Spark").setMaster("yarn-client")
sc = SparkContext(conf=conf)
sc

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import desc
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
spark = SparkSession.builder.appName('twitter').getOrCreate()


In [5]:
# Remember to add your s3 bucket name where you have uploaded the dataset
df = spark.read \
	.format("csv") \
 	.option("header","true") \
 	.option("mode", "DROPMALFORMED") \
 	.option("inferSchema", "true") \
 	.load("s3a://..your bucket name../twitter.csv")

In [6]:
# count after drop malformed records
print("Rows count : {0}".format(df.count()))

Rows count : 17426


In [7]:
df.show()

+---------+-------+-----------+------------------+-----------------+-------+-----------------+----------+---------------------+--------------+--------------------+----------+-----------+----------+---------------+---------------+--------------------+-------------+-------------+--------------------+-----------+-----------+--------------+----------+--------------------+--------------------+
| _unit_id|_golden|_unit_state|_trusted_judgments|_last_judgment_at| gender|gender:confidence|profile_yn|profile_yn:confidence|       created|         description|fav_number|gender_gold|link_color|           name|profile_yn_gold|        profileimage|retweet_count|sidebar_color|                text|tweet_coord|tweet_count| tweet_created|  tweet_id|      tweet_location|       user_timezone|
+---------+-------+-----------+------------------+-----------------+-------+-----------------+----------+---------------------+--------------+--------------------+----------+-----------+----------+---------------+---

In [8]:
df = df.filter(col("gender").isin(['male','female','brand'])).select("_unit_id","gender","description")


In [9]:
df.show()

+---------+------+--------------------+
| _unit_id|gender|         description|
+---------+------+--------------------+
|815719226|  male|i sing my own rhy...|
|815719227|  male|I'm the author of...|
|815719228|  male|louis whining and...|
|815719229|  male|Mobile guy.  49er...|
|815719230|female|Ricky Wilson The ...|
|815719231|female|  you don't know me.|
|815719232| brand|A global marketpl...|
|815719233|  male|The secret of get...|
|815719234|female|Pll Fan // Crazy ...|
|815719235|female|Renaissance art h...|
|815719236| brand|Clean food that t...|
|815719237| brand|highly extraordin...|
|815719238|female|Senior '16 . XI-X...|
|815719239| brand|Come join the fas...|
|815719240|female|im just here for ...|
|815719241|female|                null|
|815719242|female|           JMKM�_ҕ��|
|815719243|  male|Over enthusiastic...|
|815719244|  male|                null|
|815719246|female|Artisan specializ...|
+---------+------+--------------------+
only showing top 20 rows



In [10]:
# count after drop malformed records
print("Rows count after filtering unknown gender: {0}".format(df.count()))

Rows count after filtering unknown gender: 16412


In [11]:
print("Rows count per gender")
df.groupby("gender").count().show()

Rows count per gender
+------+-----+
|gender|count|
+------+-----+
|female| 5867|
| brand| 5175|
|  male| 5370|
+------+-----+



In [12]:
final_df = df.filter(col("description").isNotNull())
print("**** RAW dataframe ****")
final_df.show()

**** RAW dataframe ****
+---------+------+--------------------+
| _unit_id|gender|         description|
+---------+------+--------------------+
|815719226|  male|i sing my own rhy...|
|815719227|  male|I'm the author of...|
|815719228|  male|louis whining and...|
|815719229|  male|Mobile guy.  49er...|
|815719230|female|Ricky Wilson The ...|
|815719231|female|  you don't know me.|
|815719232| brand|A global marketpl...|
|815719233|  male|The secret of get...|
|815719234|female|Pll Fan // Crazy ...|
|815719235|female|Renaissance art h...|
|815719236| brand|Clean food that t...|
|815719237| brand|highly extraordin...|
|815719238|female|Senior '16 . XI-X...|
|815719239| brand|Come join the fas...|
|815719240|female|im just here for ...|
|815719242|female|           JMKM�_ҕ��|
|815719243|  male|Over enthusiastic...|
|815719246|female|Artisan specializ...|
|815719247|female|He bled and died ...|
|815719248|female|        union j xxxx|
+---------+------+--------------------+
only showing top

In [13]:
# count after drop malformed records
print("Rows count after filtering invalid description: {0}".format(final_df.count()))

Rows count after filtering invalid description: 13431


In [14]:
indexer = StringIndexer(inputCol="gender", outputCol="labels").fit(final_df)

In [15]:
indexed = indexer.transform(final_df)

In [16]:
print("**** RAW dataframe with String indexer ****")
indexed.show()

**** RAW dataframe with String indexer ****
+---------+------+--------------------+------+
| _unit_id|gender|         description|labels|
+---------+------+--------------------+------+
|815719226|  male|i sing my own rhy...|   1.0|
|815719227|  male|I'm the author of...|   1.0|
|815719228|  male|louis whining and...|   1.0|
|815719229|  male|Mobile guy.  49er...|   1.0|
|815719230|female|Ricky Wilson The ...|   0.0|
|815719231|female|  you don't know me.|   0.0|
|815719232| brand|A global marketpl...|   2.0|
|815719233|  male|The secret of get...|   1.0|
|815719234|female|Pll Fan // Crazy ...|   0.0|
|815719235|female|Renaissance art h...|   0.0|
|815719236| brand|Clean food that t...|   2.0|
|815719237| brand|highly extraordin...|   2.0|
|815719238|female|Senior '16 . XI-X...|   0.0|
|815719239| brand|Come join the fas...|   2.0|
|815719240|female|im just here for ...|   0.0|
|815719242|female|           JMKM�_ҕ��|   0.0|
|815719243|  male|Over enthusiastic...|   1.0|
|815719246|femal

In [17]:
TOTAL = indexed.count()
TRAIN = 0.75
TEST = 0.25

In [18]:
train = indexed.limit(int(TOTAL*TRAIN))
test = indexed.orderBy(desc("_unit_id")).limit(int(TOTAL*TEST))

In [19]:
print("Total data count {0}".format(TOTAL))
print("Training data count {0}".format(train.count()))
print("Test data count {0}".format(test.count()))

Total data count 13431
Training data count 10073
Test data count 3357


In [20]:
regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W")


In [21]:
regexTokenized = regexTokenizer.transform(train)

In [22]:
print("**** RAW dataframe with tokenized ****")
regexTokenized.show()


**** RAW dataframe with tokenized ****
+---------+------+--------------------+------+--------------------+
| _unit_id|gender|         description|labels|               words|
+---------+------+--------------------+------+--------------------+
|815734717|  male|28 | Murabb��-e S...|   1.0|[28, murabb, e, s...|
|815734719| brand|Serving Chapel Hi...|   2.0|[serving, chapel,...|
|815734722|female|Life Motto: There...|   0.0|[life, motto, the...|
|815734724| brand|           HOT DEALS|   2.0|        [hot, deals]|
|815734726|female|Child carny turne...|   0.0|[child, carny, tu...|
|815734728|female|            #ripnate|   0.0|           [ripnate]|
|815734729|  male|20, Florida Inter...|   1.0|[20, florida, int...|
|815734732|  male|Ohio University s...|   1.0|[ohio, university...|
|815734733|female|hey im alexis , i...|   0.0|[hey, im, alexis,...|
|815734734|  male|       never give up|   1.0|   [never, give, up]|
|815734735|female|It ain't always s...|   0.0|[it, ain, t, alwa...|
|81573473

In [23]:
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

In [24]:
removed = stop_words_remover.transform(regexTokenized)

In [25]:
print("**** RAW dataframe after emoving stop words ****")
removed.show()

**** RAW dataframe after emoving stop words ****
+---------+------+--------------------+------+--------------------+--------------------+
| _unit_id|gender|         description|labels|               words|      filtered_words|
+---------+------+--------------------+------+--------------------+--------------------+
|815734717|  male|28 | Murabb��-e S...|   1.0|[28, murabb, e, s...|[28, murabb, e, s...|
|815734719| brand|Serving Chapel Hi...|   2.0|[serving, chapel,...|[serving, chapel,...|
|815734722|female|Life Motto: There...|   0.0|[life, motto, the...|[life, motto, thi...|
|815734724| brand|           HOT DEALS|   2.0|        [hot, deals]|        [hot, deals]|
|815734726|female|Child carny turne...|   0.0|[child, carny, tu...|[child, carny, tu...|
|815734728|female|            #ripnate|   0.0|           [ripnate]|           [ripnate]|
|815734729|  male|20, Florida Inter...|   1.0|[20, florida, int...|[20, florida, int...|
|815734732|  male|Ohio University s...|   1.0|[ohio, universi

In [26]:
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features").setNumFeatures(11000)

In [27]:
ht_df = hashing_tf.transform(removed)
ht_df.show()

+---------+------+--------------------+------+--------------------+--------------------+--------------------+
| _unit_id|gender|         description|labels|               words|      filtered_words|        raw_features|
+---------+------+--------------------+------+--------------------+--------------------+--------------------+
|815734717|  male|28 | Murabb��-e S...|   1.0|[28, murabb, e, s...|[28, murabb, e, s...|(11000,[191,1405,...|
|815734719| brand|Serving Chapel Hi...|   2.0|[serving, chapel,...|[serving, chapel,...|(11000,[2027,2490...|
|815734722|female|Life Motto: There...|   0.0|[life, motto, the...|[life, motto, thi...|(11000,[1086,4675...|
|815734724| brand|           HOT DEALS|   2.0|        [hot, deals]|        [hot, deals]|(11000,[4231,8937...|
|815734726|female|Child carny turne...|   0.0|[child, carny, tu...|[child, carny, tu...|(11000,[361,1735,...|
|815734728|female|            #ripnate|   0.0|           [ripnate]|           [ripnate]|(11000,[3149],[1.0])|
|815734729

In [28]:
idf = IDF(inputCol="raw_features", outputCol="features")

In [29]:
idfmodel = idf.fit(ht_df)
idf_df = idfmodel.transform(ht_df)

In [30]:
print("**** RAW dataframe after adding idf and hashingtf ****")
idf_df.show()

**** RAW dataframe after adding idf and hashingtf ****
+---------+------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
| _unit_id|gender|         description|labels|               words|      filtered_words|        raw_features|            features|
+---------+------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|815719226|  male|i sing my own rhy...|   1.0|[i, sing, my, own...|      [sing, rhythm]|(11000,[8809,9451...|(11000,[8809,9451...|
|815719227|  male|I'm the author of...|   1.0|[i, m, the, autho...|[m, author, novel...|(11000,[873,978,2...|(11000,[873,978,2...|
|815719228|  male|louis whining and...|   1.0|[louis, whining, ...|[louis, whining, ...|(11000,[1000,2314...|(11000,[1000,2314...|
|815719229|  male|Mobile guy.  49er...|   1.0|[mobile, guy, 49e...|[mobile, guy, 49e...|(11000,[464,929,9...|(11000,[464,929,9...|
|815719230|female|Ricky Wils

In [31]:
rf = LogisticRegression(
		labelCol="labels", 
		featuresCol="features",
		maxIter=30, 
		regParam=0.4, 
		elasticNetParam=0.8)

In [32]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=indexer.labels)

In [33]:
pipeline = Pipeline(stages=[regexTokenizer, stop_words_remover, hashing_tf, idf, rf, labelConverter])

In [34]:
# Train model.  This also runs the indexers.
model = pipeline.fit(train)


In [35]:
# Make predictions.
predictions = model.transform(test)
print("**** Prediction on test data with LogisticRegression Model ****")
predictions.select("_unit_id","gender","description","prediction","predictedLabel").show()

**** Prediction on test data with LogisticRegression Model ****
+---------+------+--------------------+----------+--------------+
| _unit_id|gender|         description|prediction|predictedLabel|
+---------+------+--------------------+----------+--------------+
|815757985|female|Teamwork makes th...|       0.0|        female|
|815757921|female|Anti-statist; I h...|       0.0|        female|
|815757830|  male|#TeamBarcelona .....|       0.0|        female|
|815757681|  male|Whatever you like...|       0.0|        female|
|815757572|female|                (rp)|       0.0|        female|
|815756767|female|I Love Me...Mysel...|       0.0|        female|
|815756700|  male|Head Chef, Chez B...|       0.0|        female|
|815756642| brand|Reviews of delect...|       0.0|        female|
|815756542| brand|When families go ...|       0.0|        female|
|815756417|  male|Houston Chronicle...|       0.0|        female|
|815756332|female|You can find me w...|       0.0|        female|
|815756269| 

In [36]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="labels", predictionCol="prediction")
accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)
f1_score = evaluator.setMetricName("f1").evaluate(predictions)
weighted_precesion = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
weighted_recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)

In [37]:
print("**** Accuracy Metrics ****")
print("Accuracy: {0}".format(accuracy))
print("f1_score: {0}".format(f1_score))
print("weighted_precesion: {0}".format(weighted_precesion))
print("weighted_recall: {0}".format(weighted_recall))

**** Accuracy Metrics ****
Accuracy: 0.422996723265
f1_score: 0.251478060304
weighted_precesion: 0.178926227893
weighted_recall: 0.422996723265
