In [None]:
#pip install nltk

In [10]:
# Import functions
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.sql import SparkSession 

from pyspark.sql.functions import col,udf,max as max_,min as min_,expr,to_date,year,month
from pyspark.sql.types import DecimalType

import pandas as pd
import numpy as np
import seaborn as sns

import string
from string import punctuation
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')


import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer

[nltk_data] Downloading package stopwords to /home/hduser/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [11]:
#Read the csv file from hadoop and then create a temp view using spark sql

elontweetspath = "/user1/Elonmusktweets.csv"

# Obtain Tweets dataset
elontweets = spark.read.csv(elontweetspath, header='true')
elontweets.createOrReplaceTempView("elontweets")

# Cache the Tweets dataset 
elontweets.cache()
elontweets.first()

2023-05-14 14:52:55,330 WARN execution.CacheManager: Asked to cache already cached data.


Row(id='1282939902531790000', conversation_id='1282933079431150000', created_at='1594711683000', date='14/07/2020', time='07:28:03', timezone='UTC', user_id='44196397', username='elonmusk', name='Elon Musk', place=None, tweet='Cute', mentions="['teslarati']", urls='[]', photos='[]', replies_count='222', retweets_count='176', likes_count='7883', hashtags='[]', cashtags='[]', link='https://twitter.com/elonmusk/status/1282939902531796993', retweet='FALSE', quote_url=None, video='0', near=None, geo=None, source=None, user_rt_id=None, user_rt=None, retweet_id=None, reply_to="[{'user_id': '44196397', 'username': 'elonmusk'}, {'user_id': '1308211178', 'username': 'Teslarati'}]", retweet_date=None, translate=None, trans_src=None, trans_dest=None)

In [12]:
#Fetch the records and store in spark dataframe
df=spark.sql("select * from elontweets")
df


DataFrame[id: string, conversation_id: string, created_at: string, date: string, time: string, timezone: string, user_id: string, username: string, name: string, place: string, tweet: string, mentions: string, urls: string, photos: string, replies_count: string, retweets_count: string, likes_count: string, hashtags: string, cashtags: string, link: string, retweet: string, quote_url: string, video: string, near: string, geo: string, source: string, user_rt_id: string, user_rt: string, retweet_id: string, reply_to: string, retweet_date: string, translate: string, trans_src: string, trans_dest: string]

In [13]:
#Fetch the relevant columsn and store in spark dataframe

df_tweet=spark.sql("select  id,date,tweet from elontweets")
df_tweet.dtypes

[('id', 'string'), ('date', 'string'), ('tweet', 'string')]

In [14]:
#Define a function to remove the stop words from the tweet

def get_text_processing(text):
    stpword = stopwords.words('english')
    no_punctuation = [char for char in text if char not in string.punctuation]
    no_punctuation = ''.join(no_punctuation)
    return ' '.join([word for word in no_punctuation.split() if word.lower() not in stpword])

In [15]:
#Call the function to remove the stop words and text processing
get_text_processing_udf=udf(lambda x:get_text_processing(x))

In [17]:
#Select non null tweets
clean_tweet=df_tweet.select (col("id"),col('date'),get_text_processing_udf("tweet").alias("clean_tweet")).filter("tweet is not null")


In [18]:
clean_tweet.show()

[Stage 4:>                                                          (0 + 1) / 1]

+-------------------+----------+--------------------+
|                 id|      date|         clean_tweet|
+-------------------+----------+--------------------+
|1282939902531790000|14/07/2020|                Cute|
|1282844872571900000|14/07/2020|                 Wow|
|1282805559834490000|13/07/2020|Reusability essen...|
|1282800187308570000|13/07/2020|          Wild times|
|1282800078000800000|13/07/2020|We’re extra paran...|
|1282783693170300000|13/07/2020|     Welcome anytime|
|1282757235186130000|13/07/2020|Well care much su...|
|1282501634166120000|13/07/2020|Yes plan Supercha...|
|1282500394619300000|13/07/2020|                  👀|
|1282500077995520000|13/07/2020|               think|
|1282499968331210000|13/07/2020|range testing Num...|
|1282499399562580000|13/07/2020|reduced pricing M...|
|1282498919025390000|13/07/2020|                 Yes|
|1282497172982780000|13/07/2020|may able reach 25...|
|1282496094908240000|13/07/2020|range would unacc...|
|1282495504350230000|13/07/20

                                                                                

In [53]:
clean_tweet=clean_tweet.withColumn('tweet_year',year(to_date(clean_tweet.date,'dd/MM/yyyy')))
clean_tweet.show()

+-------------------+----------+--------------------+----------+
|                 id|      date|         clean_tweet|tweet_year|
+-------------------+----------+--------------------+----------+
|1282939902531790000|14/07/2020|                Cute|      2020|
|1282844872571900000|14/07/2020|                 Wow|      2020|
|1282805559834490000|13/07/2020|Reusability essen...|      2020|
|1282800187308570000|13/07/2020|          Wild times|      2020|
|1282800078000800000|13/07/2020|We’re extra paran...|      2020|
|1282783693170300000|13/07/2020|     Welcome anytime|      2020|
|1282757235186130000|13/07/2020|Well care much su...|      2020|
|1282501634166120000|13/07/2020|Yes plan Supercha...|      2020|
|1282500394619300000|13/07/2020|                  👀|      2020|
|1282500077995520000|13/07/2020|               think|      2020|
|1282499968331210000|13/07/2020|range testing Num...|      2020|
|1282499399562580000|13/07/2020|reduced pricing M...|      2020|
|1282498919025390000|13/07

[Stage 20:>                                                         (0 + 1) / 1]                                                                                

In [54]:
clean_tweet=clean_tweet.withColumn('tweet_month',month(to_date(df1.date,'dd/MM/yyyy')))
clean_tweet.show()

+-------------------+----------+--------------------+----------+-----------+
|                 id|      date|         clean_tweet|tweet_year|tweet_month|
+-------------------+----------+--------------------+----------+-----------+
|1282939902531790000|14/07/2020|                Cute|      2020|          7|
|1282844872571900000|14/07/2020|                 Wow|      2020|          7|
|1282805559834490000|13/07/2020|Reusability essen...|      2020|          7|
|1282800187308570000|13/07/2020|          Wild times|      2020|          7|
|1282800078000800000|13/07/2020|We’re extra paran...|      2020|          7|
|1282783693170300000|13/07/2020|     Welcome anytime|      2020|          7|
|1282757235186130000|13/07/2020|Well care much su...|      2020|          7|
|1282501634166120000|13/07/2020|Yes plan Supercha...|      2020|          7|
|1282500394619300000|13/07/2020|                  👀|      2020|          7|
|1282500077995520000|13/07/2020|               think|      2020|          7|


[Stage 21:>                                                         (0 + 1) / 1]                                                                                

In [55]:
clean_tweet=clean_tweet.withColumn('tweet_weekday',functions.date_format(to_date(df1.date,'dd/MM/yyyy'),'EEEE'))
clean_tweet.show()

+-------------------+----------+--------------------+----------+-----------+-------------+
|                 id|      date|         clean_tweet|tweet_year|tweet_month|tweet_weekday|
+-------------------+----------+--------------------+----------+-----------+-------------+
|1282939902531790000|14/07/2020|                Cute|      2020|          7|      Tuesday|
|1282844872571900000|14/07/2020|                 Wow|      2020|          7|      Tuesday|
|1282805559834490000|13/07/2020|Reusability essen...|      2020|          7|       Monday|
|1282800187308570000|13/07/2020|          Wild times|      2020|          7|       Monday|
|1282800078000800000|13/07/2020|We’re extra paran...|      2020|          7|       Monday|
|1282783693170300000|13/07/2020|     Welcome anytime|      2020|          7|       Monday|
|1282757235186130000|13/07/2020|Well care much su...|      2020|          7|       Monday|
|1282501634166120000|13/07/2020|Yes plan Supercha...|      2020|          7|       Monday|

[Stage 22:>                                                         (0 + 1) / 1]                                                                                

In [22]:
clean_tweet.select (clean_tweet.id.cast(StringType())).show()

+-------------------+
|                 id|
+-------------------+
|1282939902531790000|
|1282844872571900000|
|1282805559834490000|
|1282800187308570000|
|1282800078000800000|
|1282783693170300000|
|1282757235186130000|
|1282501634166120000|
|1282500394619300000|
|1282500077995520000|
|1282499968331210000|
|1282499399562580000|
|1282498919025390000|
|1282497172982780000|
|1282496094908240000|
|1282495504350230000|
|1282492573383180000|
|1282450019413420000|
|1282449465324920000|
|1282185119734700000|
+-------------------+
only showing top 20 rows



In [23]:
clean_tweet_date=clean_tweet.select(to_date(clean_tweet.date,'dd/MM/yyyy').alias('dt'))

In [27]:
clean_tweet_date.select (max_('dt')).show()

+----------+
|   max(dt)|
+----------+
|2020-07-14|
+----------+



In [25]:
clean_tweet_date.select (min_('dt')).show()

+----------+
|   min(dt)|
+----------+
|2015-01-30|
+----------+



In [28]:
import nltk


In [29]:
nltk.download([
            "names",
            "stopwords",
            "averaged_perceptron_tagger",
            "vader_lexicon",
            "punkt",
        ])

[nltk_data] Downloading package names to /home/hduser/nltk_data...
[nltk_data]   Package names is already up-to-date!
[nltk_data] Downloading package stopwords to /home/hduser/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/hduser/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/hduser/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
[nltk_data] Downloading package punkt to /home/hduser/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [30]:
#Use NLTK library for Sentimentintensity analyser
from nltk.sentiment import SentimentIntensityAnalyzer
sia = SentimentIntensityAnalyzer()


In [31]:
from random import shuffle

#Define a function to generate sentiments
def is_positive(tweet: str) -> bool:
    """True if tweet has positive compound sentiment, False otherwise."""
    if sia.polarity_scores(tweet)["compound"] > 0:
        return 1
    elif sia.polarity_scores(tweet)["compound"] < 0:
        return 0
    else: return 2
    



In [56]:
#Create a RDD mapping for tweets and its sentiments
rdd=clean_tweet.rdd.map(lambda x: (x["id"],x["date"],x["tweet_year"],x["tweet_month"],x["tweet_weekday"],
                                   x["clean_tweet"],is_positive(x["clean_tweet"])))

In [57]:
#Next step is to collect the rdd mapping and assign to a data variable
data=rdd.collect()

                                                                                

In [58]:
print(data)



In [59]:
df_clean_tweet=pd.DataFrame(data)

In [60]:
df_clean_tweet

Unnamed: 0,0,1,2,3,4,5,6
0,1282939902531790000,14/07/2020,2020.0,7.0,Tuesday,Cute,1
1,1282844872571900000,14/07/2020,2020.0,7.0,Tuesday,Wow,1
2,1282805559834490000,13/07/2020,2020.0,7.0,Monday,Reusability essential rocket single use absurd...,0
3,1282800187308570000,13/07/2020,2020.0,7.0,Monday,Wild times,2
4,1282800078000800000,13/07/2020,2020.0,7.0,Monday,We’re extra paranoid Maximizing probability su...,1
...,...,...,...,...,...,...,...
9374,564563818022313000,08/02/2015,2015.0,2.0,Sunday,Prob good though give us time replace 1st stag...,1
9375,564562686478807000,08/02/2015,2015.0,2.0,Sunday,Air Force tracking radar went Launch postponed...,0
9376,564509965612634000,08/02/2015,2015.0,2.0,Sunday,Rocket reentry much tougher time around due de...,1
9377,564493608351313000,08/02/2015,2015.0,2.0,Sunday,Launching 1st deep space mission today Headed ...,2


In [61]:
df_clean_tweet.rename(columns= {0:'tweetid' , 1:'Date',2:'Year',3:'Month',4:'Day',5:'Tweet',6:'sentiment'} ,inplace=True)

In [62]:
df_clean_tweet.reset_index(drop=True,inplace=True)

In [63]:
#pip show pandas

In [64]:
#pip install pandas==1.2.5 (2.0.0)

In [65]:
df_clean_tweet.head()

Unnamed: 0,tweetid,Date,Year,Month,Day,Tweet,sentiment
0,1282939902531790000,14/07/2020,2020.0,7.0,Tuesday,Cute,1
1,1282844872571900000,14/07/2020,2020.0,7.0,Tuesday,Wow,1
2,1282805559834490000,13/07/2020,2020.0,7.0,Monday,Reusability essential rocket single use absurd...,0
3,1282800187308570000,13/07/2020,2020.0,7.0,Monday,Wild times,2
4,1282800078000800000,13/07/2020,2020.0,7.0,Monday,We’re extra paranoid Maximizing probability su...,1


In [66]:
#Export the file to hadoop filesystem
data.write.mode("append").csv("/user1")

AttributeError: 'DataFrame' object has no attribute 'write'

In [72]:
df_clean_tweet.to_hdf('data.h5',key='df_clean_tweet',mode='w')

your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block2_values] [items->Index(['tweetid', 'Date', 'Day', 'Tweet'], dtype='object')]

  pytables.to_hdf(


In [73]:
#pip install tables

In [84]:
#df_clean_tweet.reset_index()

In [74]:
df_clean_tweet.to_csv('Clean_Tweet_Ca2.csv')

Unnamed: 0,tweetid
0,1282939902531790000
1,1282844872571900000
2,1282805559834490000
3,1282800187308570000
4,1282800078000800000
...,...
9374,564563818022313000
9375,564562686478807000
9376,564509965612634000
9377,564493608351313000
