In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('Twitter Sentiment Analysis') \
        .getOrCreate()
print('Session created')

sc = spark.sparkContext

Session created


In [0]:
tweets = spark.read.option('header',False).csv('/mnt/my_bucket/weclouddata/thanksgiving_ozge.csv')
# cache the dataframe for faster iteration
tweets.cache() 

# run the count action to materialize the cache
tweets.count()

Out[2]: 4459365

In [0]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField('id', StringType(), True),
    StructField('name', StringType(), True),
    StructField('username', StringType(), True),
    StructField('tweet', StringType(), True),
    StructField('followers_count', StringType(), True),
    StructField('location', StringType(), True), 
    StructField('geo', StringType(), True),
    StructField('created_at', StringType(), True)
])

In [0]:
file = "/mnt/my_bucket/weclouddata/thanksgiving_ozge.csv"
tweets_2 = (spark.read                   # The DataFrameReader
  .option('header', 'false')   # Ignore line #1 - it's a header
  .schema(schema)          # Use the specified schema
  .csv(file)               # Creates a DataFrame from CSV after reading in the file
)

In [0]:
tweets_2.cache()

Out[5]: DataFrame[id: string, name: string, username: string, tweet: string, followers_count: string, location: string, geo: string, created_at: string]

In [0]:
import pyspark.sql.functions as F
tweets_clean = tweets_2.withColumn('tweet', F.regexp_replace('tweet', r"http\S+", "")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-z]", " ")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
                    .withColumn('tweet', F.lower('tweet')) \
                    .withColumn('tweet', F.trim('tweet')) 
display(tweets_clean)

id,name,username,tweet,followers_count,location,geo,created_at
1595849188629446657,Truth,enchantedtruth,repmtg i ve been vaccinated x s i ve gotten a flu vax also i ve never been sick some side effects from the sho,188.0,,,Thu Nov 24 18:37:50 +0000 2022
1595849188595552257,Purple Monkey in a Submarine a.k.a. Ray Ray,aduron,ajtourville elonmusk karaswisher can t you do something more productive on thanksgiving besides mocking people,442.0,San Jose CA,,Thu Nov 24 18:37:50 +0000 2022
1595849188763308032,porn,Porndaddy1827,rt hungjockgavin happy thanksgiving sluts,132.0,,,Thu Nov 24 18:37:50 +0000 2022
1595849188847198208,Soy Martin Martinez 🇲🇽🇯🇴🎸🇨🇳🇨🇺🇷🇺🇸🇾,Chano19521,rt paulsorrentino yo aoc kids in cages wish you a happy thanksgiving,1726.0,"Playas de Rosarito, Baja Calif",,Thu Nov 24 18:37:50 +0000 2022
1595849188859944960,Enemy Of The State,TheRealBLee30,rt nba_newyork happy thanksgiving be thankful your knicks have improved,790.0,"Universe City, 212..Wheelz Up",,Thu Nov 24 18:37:50 +0000 2022
1595849188226506755,Mike H2O..,MiguelH05358319,dbacks happy thanksgiving,603.0,"Phoenix, AZ",,Thu Nov 24 18:37:50 +0000 2022
1595849189002461186,moshi's_BlackBeard,MoshiBlackbeard,rt playcodnews happy thanksgiving,255.0,,,Thu Nov 24 18:37:50 +0000 2022
1595849189032103936,Kathy Dove,movie_gal_10,rt mikef titletownusa happy thanksgiving james and all the cool music kids,5025.0,"Las Vegas, NV",,Thu Nov 24 18:37:50 +0000 2022
1595849189010849795,William mcsher,McsheeWilliam,rt kingsleycortes remember the native americans this thanksgiving,356.0,,,Thu Nov 24 18:37:50 +0000 2022
1595849188411064323,LordofScarlett,Scarlettismine,letstalkvampi may i please offer you my man letstalkvampi some cake this thanksgiving,39.0,,,Thu Nov 24 18:37:50 +0000 2022


In [0]:
from pyspark.sql.functions import col

tweets_clean.filter(col('tweet').isNull()).show(5)

+--------------------+----+--------------------+-----+---------------+--------+----+----------+
|                  id|name|            username|tweet|followers_count|location| geo|created_at|
+--------------------+----+--------------------+-----+---------------+--------+----+----------+
|white tme perisex 15|null|                null| null|           null|    null|null|      null|
|transneutral lesbian|null|                null| null|           null|    null|null|      null|
|adhdtistic did sy...|None|Thu Nov 24 18:37:...| null|           null|    null|null|      null|
|🚫ANTI-ZETA. ANTI...|null|                null| null|           null|    null|null|      null|
|⚠️FICTION AFFECTS...|null|                null| null|           null|    null|null|      null|
+--------------------+----+--------------------+-----+---------------+--------+----+----------+
only showing top 5 rows



In [0]:
tweets_notnull = tweets_clean.filter(col('tweet').isNotNull())

In [0]:
from pyspark.sql.functions import udf 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob

In [0]:
def get_sentiment(text):
    sentiment = TextBlob(text).sentiment.polarity
    if sentiment > 0:
        return 'positive'
    elif sentiment < 0:
        return 'negative'
    else:
        return 'neutral'

In [0]:
getsentiment = F.udf(lambda x: get_sentiment(x))

tweets_sentiment = tweets_notnull.withColumn('sentiment', getsentiment('tweet'))
tweets_notnull['tweet']

Out[11]: Column<'tweet'>

In [0]:
from pyspark.ml.feature import VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Use 90% cases for training, 10% cases for testing
train, test = tweets_sentiment.randomSplit([0.9, 0.1], seed=20200819)

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])

pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.9844
ROC-AUC: 0.9844


In [0]:
predictions_with_year = predictions_clean.withColumn('year', split(predictions_clean['created_at'], ' ').getItem(5))

In [0]:
from pyspark.sql.functions import col
filterDF =  predictions.select(col('filtered'), col('sentiment'))

In [0]:
from pyspark.sql.functions import explode
df2 = filterDF.select(filterDF.sentiment, explode(filterDF.filtered))
df2.printSchema()
df2.show()

root
 |-- sentiment: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------------+
|sentiment|         col|
+---------+------------+
| positive|        ohhh|
| positive|        yeah|
| positive|         got|
| positive|       happy|
| positive|thanksgiving|
| positive|    favorite|
| positive|        beer|
| positive|       maker|
|  neutral|      please|
|  neutral|      please|
|  neutral|  pleaseeeee|
|  neutral|      future|
|  neutral|     husband|
|  neutral|        make|
|  neutral|         run|
|  neutral|           k|
|  neutral|thanksgiving|
|  neutral|     morning|
|  neutral|         beg|
| positive|          rt|
+---------+------------+
only showing top 20 rows



In [0]:
predictions_write =  predictions_with_year.select(col('tweet'),col('followers_count'), col('location'), col('created_at'), col('sentiment'), col('label'), col('prediction'), col('year'))

In [0]:
(predictions_write.write                       # Our DataFrameWriter
  .option("header", "false")
  .option("delimiter", "|")
  .csv('/mnt/my_bucket/weclouddata/tweets.csv')               # Write DataFrame to csv files
)

In [0]:
(df2.write                       # Our DataFrameWriter
  .option("header", "false")
  .option("delimiter", "|")
  .csv('/mnt/my_bucket/weclouddata/words.csv')               # Write DataFrame to csv files
)