# Prerequisites

 1. install and setup the classifcation model
 > https://github.com/cardiffnlp/tweeteval
 2. clone the notebook repo
 

# Code #1: sentiment prediction tweets using open source trained model
### (  + Autoscale naive approach  )

## Preliminaries

We define a function to normalize a tweet to the format we used for TweetEval. Note that preprocessing is minimal (replacing user names by `@user` and links by `http`).

In [1]:
def preprocess(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

## Use TweetEval Classifiers

We currently provide the following fine-tuned models for different tweet classification tasks:

- emoji prediction (`emoji`)
- emotion detection (`emotion`)
- hate speech detection (`hate`)
- irony detection (`irony`)
- offensive language identification (`offensive`)
- sentiment analysis (`sentiment`)
- _(coming soon)_ stance detection (`stance`) with 5 targets (`abortion`, `atheism`, `climate`, `feminist`, `hillary`), for example: `stance-abortion`

In [2]:
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
import numpy as np
from scipy.special import softmax
import csv
import urllib.request

MODEL = f"cardiffnlp/twitter-roberta-base-emotion"

tokenizer = AutoTokenizer.from_pretrained(MODEL)

# label mapping
labels = ['anger', 'joy', 'optimism', 'sadness']

model = AutoModelForSequenceClassification.from_pretrained(MODEL)

text = "Bad night "
text = preprocess(text)
encoded_input = tokenizer(text, return_tensors='pt')
output = model(**encoded_input)
scores = output[0][0].detach().numpy()
scores = softmax(scores)

In [3]:
ranking = np.argsort(scores)
ranking = ranking[::-1]
for i in range(scores.shape[0]):
    l = labels[ranking[i]]
    s = scores[ranking[i]]
    print(f"{i+1}) {l} {np.round(float(s), 4)}")

1) sadness 0.946
2) anger 0.028
3) joy 0.0186
4) optimism 0.0074


In [4]:
## Use pandas

from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
import numpy as np
from scipy.special import softmax
import csv
from typing import List
import urllib.request

import pandas as pd

MODEL = f"cardiffnlp/twitter-roberta-base-emotion"

tokenizer = AutoTokenizer.from_pretrained(MODEL)

# label mapping
labels = ['anger', 'joy', 'optimism', 'sadness']

model = AutoModelForSequenceClassification.from_pretrained(MODEL)

In [5]:
def sentimentPredict(text: str) -> [str]:
    text = preprocess(text)
    encoded_input = tokenizer(text, return_tensors='pt')
    output = model(**encoded_input)
    scores = output[0][0].detach().numpy()
    scores = softmax(scores)
    return scores
                                   
res = sentimentPredict("hi my love")

print(res)
            

[0.01796203 0.8760045  0.04245262 0.06358095]


In [7]:
# banchmark lazy map
import timeit
num_runs = 10

# read over eylon mask tweets csv
elonTweets = pd.read_csv("./dataset/elonmusk.csv")

In [18]:
# python prediction 
%timeit elonTweets["tweet"][0:10].map(sentimentPredict)
%timeit elonTweets["tweet"][0:100].map(sentimentPredict)
%timeit elonTweets["tweet"][0:1000].map(sentimentPredict)
%timeit elonTweets["tweet"][0:3000].map(sentimentPredict)

739 ms ± 22.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
7.39 s ± 260 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1min 16s ± 354 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
4min 17s ± 36.5 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


-------------------

# Code #2: Optimise using pandas 

In [9]:
# panda prediction

def sentimentPredictPd(s: pd.Series) -> pd.Series:
    return s.map(sentimentPredict)
   
                                   
%timeit sentimentPredictPd(elonTweets["tweet"][0:10])
%timeit sentimentPredictPd(elonTweets["tweet"][0:100])
%timeit sentimentPredictPd(elonTweets["tweet"][0:1000])
%timeit sentimentPredictPd(elonTweets["tweet"][0:3000])


701 ms ± 24.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
7.41 s ± 143 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1min 15s ± 1.14 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
4min 1s ± 13.1 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


-------------------

# Code #3: Optimise using columnar memory format

In [1]:
# init model
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
import numpy as np
from scipy.special import softmax
import csv
from typing import List
import urllib.request

import pandas as pd

MODEL = f"cardiffnlp/twitter-roberta-base-emotion"

tokenizer = AutoTokenizer.from_pretrained(MODEL)

# label mapping
labels = ['anger', 'joy', 'optimism', 'sadness']

model = AutoModelForSequenceClassification.from_pretrained(MODEL)

def preprocess(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)


### Without Arrow

In [3]:
from pyspark.sql.functions import *
import pandas as pd

from pyspark.conf import SparkConf
from pyspark.context import SparkContext

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, FloatType, StructType, IntegerType, StructField

#spark options

NUMBER_OF_PARTITSION = 4
MAX_RECORDS_PER_BATCH = 900

spark = SparkSession.builder.appName("My-Application").config("spark.driver.memory", "5g").getOrCreate()

elonmuskTweetsDf = spark.read.option("header",True).csv("/Users/leeofri/Downloads/elonmusk.csv")

def sentimentPredict(text: str) -> [float]:
    text = preprocess(text)
    encoded_input = tokenizer(text, return_tensors='pt')
    output = model(**encoded_input)
    scores = output[0][0].detach().numpy()
    scores = softmax(scores)
    return scores

def predict_panda(itdf): 
    for df in itdf:
        df["score"] = df["tweet"].map(lambda text: sentimentPredict(text))
        yield df

schema = StructType([
        StructField("tweet", StringType()),
        StructField("score", ArrayType(FloatType()))
    ])

elonmuskTweetsDf = elonmuskTweetsDf.repartition(4).filter("tweet != ''")

print("arrow.pyspark.enabled: ",spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))
print("arrow.enabled: " ,spark.conf.get("spark.sql.execution.arrow.enabled"))

print("Tweets count: " , elonmuskTweetsDf.count())
print("Partitsion count: " , elonmuskTweetsDf.rdd.getNumPartitions())

resAll = elonmuskTweetsDf.mapInPandas(predict_panda,schema=schema)
%time resAll.show()




arrow.pyspark.enabled:  false
arrow.enabled:  false
Tweets count:  9379
Partitsion count:  4
+--------------------+--------------------+
|               tweet|               score|
+--------------------+--------------------+
|!! Will take action.|[0.84155434, 0.01...|
|           Nice 🤣🤣|[0.024141183, 0.9...|
|                  Ok|[0.3830172, 0.187...|
|Customized horn &...|[0.29382166, 0.52...|
|We need to do one...|[0.062386807, 0.5...|
|"Congrats to @Tal...|[0.0120146675, 0....|
|Maybe end of next...|[0.079631224, 0.4...|
|Whether Z-pak wor...|[0.054457296, 0.0...|
|               False|[0.7806611, 0.032...|
|We fought the Bal...|[0.26407075, 0.33...|
|Rolling out a new...|[0.052287556, 0.7...|
|Yes, about a mont...|[0.041002773, 0.7...|
|@_Only_Hawk @MICH...|[0.08871256, 0.73...|
|           Haha true|[0.016787158, 0.9...|
|Yes, but still a ...|[0.08220884, 0.04...|
|                Haha|[0.018273905, 0.9...|
|Agreed, top prior...|[0.2593304, 0.031...|
|Next landing atte...|[0.6325

### With Arrow

In [2]:
from pyspark.sql.functions import *
import pandas as pd
import pyarrow

from pyspark.conf import SparkConf
from pyspark.context import SparkContext

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, FloatType, StructType, IntegerType, StructField

#spark options

NUMBER_OF_PARTITSION = 4
MAX_RECORDS_PER_BATCH = 900

# batch conf for pyarrow
spark = SparkSession.builder.appName("My-Application2").config("spark.driver.memory", "5g").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", MAX_RECORDS_PER_BATCH) # max batch size!!

elonmuskTweetsDf = spark.read.option("header",True).csv("/Users/leeofri/Downloads/elonmusk.csv")

def sentimentPredict(text: str) -> [float]:
    text = preprocess(text)
    encoded_input = tokenizer(text, return_tensors='pt')
    output = model(**encoded_input)
    scores = output[0][0].detach().numpy()
    scores = softmax(scores)
    return scores

def predict_panda(itdf): 
    for df in itdf:
        df["score"] = df["tweet"].map(lambda text: sentimentPredict(text))
        yield df

schema = StructType([
        StructField("tweet", StringType()),
        StructField("score", ArrayType(FloatType()))
    ])

elonmuskTweetsDf = elonmuskTweetsDf.repartition(4).filter("tweet != ''")

print("arrow.pyspark.enabled: ",spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))
print("arrow.enabled: " ,spark.conf.get("spark.sql.execution.arrow.enabled"))

print("Tweets count: " , elonmuskTweetsDf.count())
print("Partitsion count: " , elonmuskTweetsDf.rdd.getNumPartitions())

resAll = elonmuskTweetsDf.mapInPandas(predict_panda,schema=schema)
%time resAll.show()


arrow.pyspark.enabled:  true
arrow.enabled:  true
Tweets count:  9379
Partitsion count:  4
+--------------------+--------------------+
|               tweet|               score|
+--------------------+--------------------+
|!! Will take action.|[0.84155434, 0.01...|
|           Nice 🤣🤣|[0.024141183, 0.9...|
|                  Ok|[0.3830172, 0.187...|
|Customized horn &...|[0.29382166, 0.52...|
|We need to do one...|[0.062386807, 0.5...|
|"Congrats to @Tal...|[0.0120146675, 0....|
|Maybe end of next...|[0.079631224, 0.4...|
|Whether Z-pak wor...|[0.054457296, 0.0...|
|               False|[0.7806611, 0.032...|
|We fought the Bal...|[0.26407075, 0.33...|
|Rolling out a new...|[0.052287556, 0.7...|
|Yes, about a mont...|[0.041002773, 0.7...|
|@_Only_Hawk @MICH...|[0.08871256, 0.73...|
|           Haha true|[0.016787158, 0.9...|
|Yes, but still a ...|[0.08220884, 0.04...|
|                Haha|[0.018273905, 0.9...|
|Agreed, top prior...|[0.2593304, 0.031...|
|Next landing atte...|[0.632553

## Elon Mask emotions?  

In [3]:

sentimentSpred = resAll.withColumn("anger", resAll["score"].getItem(0)).withColumn("joy", resAll["score"].getItem(1)).withColumn("optimism",resAll["score"].getItem(2)).withColumn("sadness", resAll["score"].getItem(3))

sentimentSpred.printSchema()



root
 |-- tweet: string (nullable = true)
 |-- score: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- anger: float (nullable = true)
 |-- joy: float (nullable = true)
 |-- optimism: float (nullable = true)
 |-- sadness: float (nullable = true)



In [6]:
sentimentSpred.agg({"anger": "avg",
                    "joy": "avg",
                    "optimism": "avg",
                    "sadness": "avg"}).show()

+-------------------+-------------------+-------------------+-------------------+
|           avg(joy)|      avg(optimism)|       avg(sadness)|         avg(anger)|
+-------------------+-------------------+-------------------+-------------------+
|0.37002909468911505|0.24679182631977264|0.16801638677369704|0.21516268802772973|
+-------------------+-------------------+-------------------+-------------------+

