In [1]:
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, StopWordsRemover, CountVectorizer, IDF, RegexTokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

In [2]:
tweets1 = spark.read.json("C:/Users/Serkan/OneDrive - KU Leuven/LEUVEN/KUL_STAT/Semester2/Advanced Analytics in a Big Data World/assignments/three.new/savedata-*")
tweets2 = spark.read.json("C:/Users/Serkan/OneDrive - KU Leuven/LEUVEN/KUL_STAT/Semester2/Advanced Analytics in a Big Data World/assignments/tweets/tweets-*")

In [3]:
tweets = tweets1.union(tweets2).distinct()

In [4]:
tweets = tweets.drop('tweet_id')

In [5]:
tweets.groupBy("label") \
      .count() \
      .orderBy(col("count").desc()) \
      .show()

+--------------+-----+
|         label|count|
+--------------+-----+
|      #vaccine| 3877|
|        #covid| 3678|
|        #china| 3119|
|        #biden| 2890|
|#stopasianhate| 1238|
|    #inflation|  657|
+--------------+-----+



In [6]:
(trainDF, testDF) = tweets.randomSplit((0.80, 0.20), seed=200)

In [7]:
labeler = StringIndexer(inputCol="label", outputCol="target")

regexer = RegexTokenizer(inputCol='tweet_text', outputCol="tokens", pattern="((https).+)|[^0-9a-z#+]+", minTokenLength=3)

stopworder = StopWordsRemover().setInputCol('tokens').setOutputCol('words')

vectorizer = CountVectorizer(inputCol='words', outputCol="countFeatures")

idf = IDF(inputCol='countFeatures', outputCol="features")

lr = LogisticRegression(featuresCol='features', labelCol="target", maxIter=20)

pipeline = Pipeline(stages=[
    labeler,
    regexer,
    stopworder,
    vectorizer,
    idf,
    lr])

In [8]:
'''
While applying HashingTF only needs a single pass to the data, applying IDF needs two passes: first to compute the IDF vector and second to scale the term frequencies by IDF.

from pyspark.mllib.feature import IDF

# ... continue from the previous example
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
'''

'\nWhile applying HashingTF only needs a single pass to the data, applying IDF needs two passes: first to compute the IDF vector and second to scale the term frequencies by IDF.\n\nfrom pyspark.mllib.feature import IDF\n\n# ... continue from the previous example\ntf.cache()\nidf = IDF().fit(tf)\ntfidf = idf.transform(tf)\n'

In [9]:
paramGrid = (ParamGridBuilder()
  .addGrid(lr.elasticNetParam, [0.0, 0.2, 0.5, 0.8])
  .addGrid(lr.regParam, [0.001, 0.01, 0.1])
  .build())

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1"),
                          numFolds=4)

In [10]:
cvmodel = crossval.fit(trainDF)

predictions = cvmodel.transform(testDF)

evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")

evaluator.evaluate(predictions)

0.6322648944141701

In [11]:
cvmodel.avgMetrics

[0.5741735864148919,
 0.5839552718292101,
 0.5968820516060527,
 0.5944874251793408,
 0.6228133092103406,
 0.5485467018428257,
 0.6060507405447884,
 0.6287117845933049,
 0.4266842552695711,
 0.6115089812518024,
 0.6204290134036783,
 0.32292393344671866]

In [12]:
cvmodel.bestModel.stages[3].getMinDF()

1.0

In [13]:
cvmodel.bestModel.stages[5].getMaxIter()

20

In [14]:
cvmodel.bestModel.stages[5].getRegParam()

0.01

In [15]:
cvmodel.bestModel.stages[5].getElasticNetParam()

0.5

In [16]:
cvmodel.bestModel.stages[5]

LogisticRegressionModel: uid=LogisticRegression_418448bd72c8, numClasses=6, numFeatures=26112

In [18]:
cvmodel.save('cvmodel1')

In [19]:
predictions.groupBy("label", 'target', "prediction").count().sort('target', 'prediction').show(40)

+--------------+------+----------+-----+
|         label|target|prediction|count|
+--------------+------+----------+-----+
|      #vaccine|   0.0|       0.0|  537|
|      #vaccine|   0.0|       1.0|  129|
|      #vaccine|   0.0|       2.0|   33|
|      #vaccine|   0.0|       3.0|   32|
|      #vaccine|   0.0|       5.0|    1|
|        #covid|   1.0|       0.0|  190|
|        #covid|   1.0|       1.0|  405|
|        #covid|   1.0|       2.0|   34|
|        #covid|   1.0|       3.0|   47|
|        #covid|   1.0|       4.0|    4|
|        #covid|   1.0|       5.0|    1|
|        #china|   2.0|       0.0|  103|
|        #china|   2.0|       1.0|   88|
|        #china|   2.0|       2.0|  384|
|        #china|   2.0|       3.0|   54|
|        #china|   2.0|       4.0|    5|
|        #china|   2.0|       5.0|    4|
|        #biden|   3.0|       0.0|   96|
|        #biden|   3.0|       1.0|   74|
|        #biden|   3.0|       2.0|   27|
|        #biden|   3.0|       3.0|  360|
|        #biden|

In [20]:
targetToLabel = {
    1.0: 'covid',
    2.0: 'china',
    3.0: 'biden',
    4.0: 'stopasianhate',
    5.0: 'inflation',
    0.0: 'vaccine'
}

In [None]:
# read pickled model via pipeline api
#from pyspark.ml.pipeline import PipelineModel
#persistedModel = PipelineModel.load(mPath)