In [1]:
%pylab inline

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql import Window
import mysql.connector

import warnings
warnings.filterwarnings("ignore")

Populating the interactive namespace from numpy and matplotlib


In [2]:
sc

In [2]:
# Define appName and master inputs
appName = "Tweet dataset pre-processing - Output"
master = "local"

In [4]:
# Initiate spark session and connect to MySQL
spark = SparkSession.builder.master('local').config("spark.driver.extraClassPath", "/path/to/mysql-connector-java.jar").appName(appName).getOrCreate()

#sc = SparkContext()
#import re
#from operator import add
#spark = SparkSession(sc)

In [5]:
# Read tweets_input table stored in MySQL Tweets_DB database

Tweets_in_df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/Tweets_DB") \
    .option("dbtable", "tweets_input") \
    .option("user", "root") \
    .option("password", "password") \
    .load()


In [7]:
# Print table schema
Tweets_in_df.printSchema()

root
 |-- #: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [8]:
# show 5 first results from table
Tweets_in_df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+---+----------+--------------------+--------+---------------+--------------------+
|  #|        id|                date|    flag|           user|                text|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



                                                                                

In [12]:
# set spark.sql.legacy.timeParserPolicy to LEGACY to fix issue
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [10]:
# Create new column 'timestamp' from column date and transform it into datestamp structure type
Tweets_in_df = Tweets_in_df.withColumn('timestamp', to_timestamp(Tweets_in_df['date'], 'EEE MMM dd HH:mm:ss zzz yyyy'))

In [14]:
# Print schema with new column added
Tweets_in_df.printSchema()

root
 |-- #: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [13]:
# show  first result from table
Tweets_in_df.show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+---+----------+--------------------+--------+---------------+--------------------+-------------------+
|  #|        id|                date|    flag|           user|                text|          timestamp|
+---+----------+--------------------+--------+---------------+--------------------+-------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|2009-04-07 06:19:45|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|2009-04-07 06:19:49|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|2009-04-07 06:19:53|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|2009-04-07 06:19:57|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|2009-04-07 06:19:57|
+---+----------+--------------------+--------+---------------+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [15]:
# Explore flag column values
Tweets_in_df.groupBy('flag').count().show()

                                                                                

+--------+-------+
|    flag|  count|
+--------+-------+
|NO_QUERY|1600000|
+--------+-------+



In [16]:
# Drop unecessary columns from table
tweets_df = Tweets_in_df.drop("#","id","date","flag","user")

In [17]:
# Print results
tweets_df.show(3)

[Stage 13:>                                                         (0 + 1) / 1]

+--------------------+-------------------+
|                text|          timestamp|
+--------------------+-------------------+
|@switchfoot http:...|2009-04-07 06:19:45|
|is upset that he ...|2009-04-07 06:19:49|
|@Kenichan I dived...|2009-04-07 06:19:53|
+--------------------+-------------------+
only showing top 3 rows



                                                                                

## Sentiment Classification

In [18]:
# define function to extract features and clean text
my_punctuation = '!"$%&\'()*+,-./:;<=>?[\\]^_`{|}~•@â' 
def pre_process(tweet):
    tweet = re.sub(r'http\S+', '', tweet)  
    tweet = re.sub(r'bit.ly/\S+', '', tweet) 
    tweet = tweet.strip('[link]')  
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    tweet = re.sub('['+my_punctuation + ']+', ' ', tweet) 
    tweet = re.sub('([0-9]+)', '', tweet) 
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    tweet = re.sub('(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|(#[A-Za-z0-9]+)', '', tweet)
    tweet = re.sub(r'\s+',' ',tweet)
    
    return tweet

In [19]:
# Apply function to table
clean_data = udf(pre_process)

In [20]:
# Update table with clean text data
preproc_tweets_df = tweets_df.withColumn("text", clean_data(tweets_df['text']))
preproc_tweets_df.show(5)

[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+-------------------+
|                text|          timestamp|
+--------------------+-------------------+
| Awww that s a bu...|2009-04-07 06:19:45|
|s upset that he c...|2009-04-07 06:19:49|
| I dived many tim...|2009-04-07 06:19:53|
|my whole body fee...|2009-04-07 06:19:57|
| no it s not beha...|2009-04-07 06:19:57|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [21]:
# Get year, month and day values from timestamp column
preproc_tweets_df = preproc_tweets_df.withColumn("year", year(col("timestamp")).cast(StringType()))
preproc_tweets_df = preproc_tweets_df.withColumn("month", month(col("timestamp")).cast(StringType()))
preproc_tweets_df = preproc_tweets_df.withColumn("day", dayofmonth(col("timestamp")).cast(StringType()))

In [22]:
# Show data by month
preproc_tweets_df.groupBy('month').count().show()

                                                                                

+-----+------+
|month| count|
+-----+------+
|    5|554060|
|    6|945915|
|    4|100025|
+-----+------+



In [25]:
# Show first 5 rows
preproc_tweets_df.show(5)

[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+-------------------+----+-----+---+
|                text|          timestamp|year|month|day|
+--------------------+-------------------+----+-----+---+
| Awww that s a bu...|2009-04-07 06:19:45|2009|    4|  7|
|s upset that he c...|2009-04-07 06:19:49|2009|    4|  7|
| I dived many tim...|2009-04-07 06:19:53|2009|    4|  7|
|my whole body fee...|2009-04-07 06:19:57|2009|    4|  7|
| no it s not beha...|2009-04-07 06:19:57|2009|    4|  7|
+--------------------+-------------------+----+-----+---+
only showing top 5 rows



                                                                                

In [23]:
# Create a function to get the subjectivity, polarity and sentiment values

import textblob
from textblob import TextBlob

def getSubjectivity(tweet: str) -> float:
    return TextBlob(tweet).sentiment.subjectivity

def getPolarity(tweet: str) -> float:
    return TextBlob(tweet).sentiment.polarity

def getSentiment(polarityValue: int) -> str:
    if polarityValue < 0:
        return 'Negative'
    elif polarityValue == 0:
        return 'Neutral'
    else:
        return 'Positive'
    
    tweet = TextBlob(text)
    if tweet.sentiment.polarity < 0:
        sentiment = "negative"
    elif tweet.sentiment.polarity == 0:
        sentiment = "neutral"
    else:
        sentiment = "positive"
    return sentiment

In [24]:
# Apply function to data and define structure type
subjectivity = F.udf(getSubjectivity, FloatType())
polarity = F.udf(getPolarity, FloatType())
sentiment = F.udf(getSentiment, StringType())

In [26]:
# Update table to include subjectivity, polarity and sentiment values
preproc_tweets_df = preproc_tweets_df.withColumn('subjectivity', subjectivity(col("text")))
preproc_tweets_df = preproc_tweets_df.withColumn("polarity", polarity(col("text")))
preproc_tweets_df = preproc_tweets_df.withColumn("sentiment", sentiment(preproc_tweets_df['polarity']))

In [27]:
# Show first 5 rows
preproc_tweets_df.show(5)

[Stage 26:>                                                         (0 + 1) / 1]

+--------------------+-------------------+----+-----+---+------------+--------+---------+
|                text|          timestamp|year|month|day|subjectivity|polarity|sentiment|
+--------------------+-------------------+----+-----+---+------------+--------+---------+
| Awww that s a bu...|2009-04-07 06:19:45|2009|    4|  7|        0.45|     0.2| Positive|
|s upset that he c...|2009-04-07 06:19:49|2009|    4|  7|         0.0|     0.0|  Neutral|
| I dived many tim...|2009-04-07 06:19:53|2009|    4|  7|         0.5|     0.5| Positive|
|my whole body fee...|2009-04-07 06:19:57|2009|    4|  7|         0.4|     0.2| Positive|
| no it s not beha...|2009-04-07 06:19:57|2009|    4|  7|         1.0|  -0.625| Negative|
+--------------------+-------------------+----+-----+---+------------+--------+---------+
only showing top 5 rows



                                                                                

In [39]:
 # Define output table columns order
out_cols = ['timestamp', 'year', 'month', 'day', 'text', 'sentiment']

In [41]:
# Apply columns order to table
tweets_out_df = preproc_tweets_df.select(out_cols)

In [42]:
# Show first 5 rows
tweets_out_df.show(5)

[Stage 27:>                                                         (0 + 1) / 1]

+-------------------+----+-----+---+--------------------+---------+
|          timestamp|year|month|day|                text|sentiment|
+-------------------+----+-----+---+--------------------+---------+
|2009-04-07 06:19:45|2009|    4|  7| Awww that s a bu...| Positive|
|2009-04-07 06:19:49|2009|    4|  7|s upset that he c...|  Neutral|
|2009-04-07 06:19:53|2009|    4|  7| I dived many tim...| Positive|
|2009-04-07 06:19:57|2009|    4|  7|my whole body fee...| Positive|
|2009-04-07 06:19:57|2009|    4|  7| no it s not beha...| Negative|
+-------------------+----+-----+---+--------------------+---------+
only showing top 5 rows



                                                                                

In [43]:
# Show output table schema
tweets_out_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- text: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [44]:
# Write processed data to table tweets_output into MySQL database Tweets_DB 
tweets_out_df.write.format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/Tweets_DB") \
  .option("dbtable", "tweets_output") \
  .option("user", "root") \
  .option("password", "password") \
  .mode("overwrite") \
  .save()

                                                                                