In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import pandas as pd

In [2]:
conf = SparkConf() \
    .setAppName("telegram-prediction") \
    .setMaster("local[*]") \
    .set('spark.jars', 'spark-cassandra-connector-assembly_2.12-3.0.1.jar')

In [3]:
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [4]:
data = sqlContext.read.format("org.apache.spark.sql.cassandra") \
    .option("spark.cassandra.auth.username", "cassandra") \
    .option("spark.cassandra.auth.password", "cassandra") \
    .options(table="hashtags_context2", keyspace="telegram_data").load()
data=data.distinct()

In [5]:
# data = data.where(F.col('hashtag')!=["خبرهای_داغ_مهم","لوتوس_نیوز"])
# array = ["خبرهای_داغ_مهم","لوتوس_نیوز","خبرهای_فوری_مهم","پارسینه","خبر_ويژه"]
# data=data.filter(data.hashtag.isin(array) == False)

In [6]:
from pyspark.sql.functions import col
data.groupBy("hashtag") \
    .count() \
    .orderBy(col("count").desc()) \
    .toPandas()

Unnamed: 0,hashtag,count
0,خبرهای_داغ_مهم,13
1,خبرهای_فوری_مهم,9
2,فاکس_نیوز,7
3,khabar_news,6
4,کانال_اخبار_کرج,5
...,...,...
123,ماچین,1
124,کیایی,1
125,لوئیزا_می_الکات,1
126,کانال_هواشناسی_کشور,1


In [7]:
data=data.select(["hashtag","context"])
data.printSchema()

root
 |-- hashtag: string (nullable = false)
 |-- context: string (nullable = true)



In [8]:
# import re
# import pyspark.sql.functions as F
# from pyspark.sql.types import *
# def remove_emoji(string):
#     emoji_pattern = re.compile("["
#                            u"\U0001F600-\U0001F64F"  # emoticons
#                            u"\U0001F300-\U0001F5FF"  # symbols & pictographs
#                            u"\U0001F680-\U0001F6FF"  # transport & map symbols
#                            u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
#                            u"\U00002702-\U000027B0"
#                            u"\U000024C2-\U0001F251"
#                            "]+", flags=re.UNICODE)
#     return emoji_pattern.sub(r'', string)

# udfsomefunc = F.udf(remove_emoji, StringType())
# data = data.withColumn("context",udfsomefunc("context"))
# data.toPandas()

In [9]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from hazm import *
udfsomefunc = F.udf(word_tokenize, ArrayType(StringType()))
data = data.withColumn("words",udfsomefunc("context"))
# data.toPandas()

In [10]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from hazm import *
# regular expression tokenizer
stopwords=[]
add_stopwords=[]
with open("stop-words.txt") as stop_word_file:
    stopwords = stop_word_file.readlines()
for i in stopwords: add_stopwords.append(i.replace("\n",""))
# stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# # bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features_TFIDF", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "hashtag", outputCol = "label")
pipeline = Pipeline(stages=[stopwordsRemover, hashingTF, idf, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.toPandas()

Unnamed: 0,hashtag,context,words,filtered,rawFeatures,features_TFIDF,features,label
0,سیاسی,🔻اژه‌ای: باید قوه قضاییه در تراز گام دوم انقلا...,"[🔻اژه‌ای, :, باید, قوه, قضاییه, در, تراز, گام,...","[🔻اژه‌ای, :, قوه, قضاییه, تراز, گام, انقلاب, ▫...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(4.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",10.0
1,سیاسی,🔻 انهدام باند قاچاق سلاح جنگی در غرب کشور\n\n🔹...,"[🔻, انهدام, باند, قاچاق, سلاح, جنگی, در, غرب, ...","[🔻, انهدام, باند, قاچاق, سلاح, جنگی, غرب, کشور...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...",10.0
2,سیاسی,🔻دستور جهانگیری برای رسیدگی به مشکل خوزستان\n\...,"[🔻دستور, جهانگیری, برای, رسیدگی, به, مشکل, خوز...","[🔻دستور, جهانگیری, رسیدگی, مشکل, خوزستان, 🔹معا...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.0, 6.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",10.0
3,زنان,✍یه تیکه کتاب :\n\nساعت‌های منظمی برای کار و خ...,"[✍یه, تیکه, کتاب, :, ساعت‌های, منظمی, برای, کا...","[✍یه, تیکه, کتاب, :, ساعت‌های, منظمی, کار, خوش...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.0, 2.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",83.0
4,خلاصه_بازی,#خلاصه_بازی استقلال 0⃣(۴)-(۳)0⃣ پیروزی در تاری...,"[#خلاصه_بازی, استقلال, 0⃣, (, ۴, ), -, (, ۳, )...","[#خلاصه_بازی, استقلال, 0⃣, (, ۴, ), -, (, ۳, )...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",74.0
...,...,...,...,...,...,...,...,...
196,ورودی_۹۴_به_قبل,#چارت\n#ورودی_۹۴_به_قبل\n#مهندسی_مواد_سرامیک\n...,"[#چارت, #ورودی_۹۴_به_قبل, #مهندسی_مواد_سرامیک,...","[#چارت, #ورودی_۹۴_به_قبل, #مهندسی_مواد_سرامیک,...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",29.0
197,مولانا,.\nجانا تویی کلیم و منم چون عصای تو\nگه تکیه گ...,"[., جانا, تویی, کلیم, و, منم, چون, عصای, تو, گ...","[., جانا, تویی, کلیم, منم, عصای, گه, تکیه, خلق...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",112.0
198,اخبار_کرونا_ویروس,🔴 #اخبار_کرونا_ویروس \n\n🔶 آخرین آمار کرونا در...,"[🔴, #اخبار_کرونا_ویروس, 🔶, آخرین, آمار, کرونا,...","[🔴, #اخبار_کرونا_ویروس, 🔶, آمار, کرونا, ایران,...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 6.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",37.0
199,این_راه_پر_امید_ادامه_دارد,📋 برنامه پزشکان کلینیک تخصصی و فوق تخصصی شماره...,"[📋, برنامه, پزشکان, کلینیک, تخصصی, و, فوق, تخص...","[📋, برنامه, پزشکان, کلینیک, تخصصی, تخصصی, شمار...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...",18.0


In [12]:
# set seed for reproducibility
print((dataset.count()))
(train, test) = dataset.randomSplit([0.8, 0.2], seed = 100)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

201
Training Dataset Count: 160
Test Dataset Count: 45


In [13]:
lr = LogisticRegression(featuresCol = 'features_TFIDF',maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(train)

In [14]:
predictions = lrModel.transform(test)

In [15]:
predictions.select("context","hashtag","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .toPandas()

Unnamed: 0,context,hashtag,probability,label,prediction
0,🔴الخلیج آنلاین: گام بلند عمان برای آشتی دادن ا...,خبرهای_داغ_مهم,"[0.8435687330419707, 0.0025867390916262134, 0....",0.0,0.0
1,🔴 فوت ۷۷ کودک براثر کرونا فقط در یک بیمارستان\...,خبرهای_داغ_مهم,"[0.7939037465759257, 0.002311444250366713, 0.0...",0.0,0.0
2,💠 #حدیث_روز \n\nپيامبر اکرم (صلّي الله عليه و ...,خبرهای_داغ_مهم,"[0.4370969567325774, 0.003564240843062007, 0.0...",0.0,0.0
3,💠 #حدیث_روز \n\nپيامبر اکرم (صلّي الله عليه و ...,حدیث_روز,"[0.4370969567325774, 0.003564240843062007, 0.0...",22.0,0.0
4,🔺ماجرای جالب بازیگر مرد که همسرش سر به سرش می‌...,خبرهای_داغ_مهم,"[0.25006200963056624, 0.01191034627024749, 0.0...",0.0,0.0
5,📊 #اینفوگرافی| خطر وقوع سیل و طوفان در کدام اس...,اینفوگرافی,"[0.014491868640361734, 0.013280069667171468, 0...",19.0,7.0
6,☑️ چه بسا سگ‌هایی منتظر مرگ شیر ماندند!\n\n🖍و ...,Lover_saipa,"[0.013695369241624265, 0.01608518508042878, 0....",33.0,85.0
7,‌\nای رفته كم‌كم از دل و جان، ناگهان بيا\nمثل ...,فاضل_نظری,"[0.013678418108633128, 0.014257064735530194, 0...",92.0,11.0
8,#مبلغ_غدیر_باشیم \n@TehranNews_ir,مبلغ_غدیر_باشیم,"[0.012988227437713993, 0.015090123630850192, 0...",105.0,11.0
9,#کاریکاتور\nاولین قهرمانی لیونل مسی با تیم ملی...,کاریکاتور,"[0.012575890046145633, 0.016603600500793913, 0...",122.0,11.0


In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="label",metricName="f1")
evaluator.evaluate(predictions)

0.5918699186991871

In [17]:
lr = LogisticRegression(featuresCol = 'features',maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(train)

In [18]:
predictions = lrModel.transform(test)

In [19]:
# predictions.select("context","hashtag","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .toPandas()

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="label",metricName="f1")
evaluator.evaluate(predictions)

0.5021367521367521

In [21]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(train)
predictions = model.transform(test)


In [22]:
# predictions.select("context","hashtag","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .toPandas()

In [23]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="label",metricName="f1")
evaluator.evaluate(predictions)

0.27272727272727276