In [1]:
#Import libraries

import pyspark 
from pyspark.sql import SparkSession 
from textblob import TextBlob 
import re 
import string 

In [4]:
#Create SparkSession 
spark = SparkSession.builder.appName('ProjectADE').getOrCreate()


In [5]:
#Create RDD from a text file

data = spark.sparkContext.textFile("bitcoin.csv")
data.take(2)

['Tue May 01 01:27:46 +0000 2018,leffler_iris,"<a href=""http://twitter.com"" rel=""nofollow"">Twitter Web Client</a>",,0,57,en,b\'RT @justinsuntron: Livestream with Taiwan legislator and #blockchain advocate Jason Hsu @augama will take place tonight at 9pm PST. Link co\'',
 ' ,,,,,,,']

In [6]:
data.count()

4076

##### **Text Preprocessing** is an important step for **Natural Language Processing** (NLP) tasks.

The preprocessing steps taken are:

1. Replacing Emoticons: Replace emoticons by using a  pre-defined dictionary containing emojis along with their meaning. (eg: ":))" to "very happy")

2. Replacing English abbreviations: Replace English abbreviation to correct spelling. (eg: "u" to "you")

3. Removing features: Remove RT,hyperlinks, @mentions and hashtag and numerical digits
  
4. Replacing Malay abbreviations: Replace Malay abbreviaton to correct spelling. (eg. "sy" to "saya"

5. Lower casing: Each text is converted to lowercase. 

In [7]:
# Definining dictionary containing all emojis with their meaning.
def resolve_emoticon(line):
   emoticon = {
    	':-)' : 'smile',
        ':)'  : 'sad',
    	':))' : 'very happy',
    	':)'  : 'happy',
    	':((' : 'very sad',
    	':('  : 'sad',
    	':-P' : 'tongue',
    	':-o' : 'gasp',
    	'>:-)':'angry'
   }   
   for key in emoticon:
      line = line.replace(key, emoticon[key])
   return line

In [8]:
# Defining dictionary containng English abbreviaton to correct spelling.

def abb_en(line):
   abbreviation_en = {
    'u': 'you',
    'thr': 'there',
    'asap': 'as soon as possible',
    'lv' : 'love',    
    'c' : 'see'
   } 
   abbrev = ' '.join (abbreviation_en.get(word, word) for word in line.split())
   return (resolve_emoticon(abbrev))  

In [9]:
# Defining set containing features in the tweets such RT, hyperlinks, @mentions and hash and numerical digit. 

def remove_features(data_str): 
    
   url_re= re.compile(r'https?://(\S+)') 
   num_re= re.compile(r'(\d+)') 
   mention_re= re.compile(r'(@|#)(\w+)') 
   RT_re= re.compile(r'RT(\s+)') 
    
   data_str= str(data_str) 
   data_str= RT_re.sub(' ', data_str) # remove RT 
   data_str= url_re.sub(' ', data_str) # remove hyperlinks \
   data_str= mention_re.sub(' ', data_str) # remove @mentions and hash 
   data_str= num_re.sub(' ', data_str) # remove numerical digit 
     
   return data_str

In [10]:
# Create function to obtain sentiment category
def categorize_polarity(x):
    if x > 0:
        return 'Positive'
    elif x < 0:
        return 'Negative'
    else:
        return 'Neutral'
 

In [11]:
sentiment_polarity = spark.sparkContext.textFile("bitcoin.csv") \
.map(lambda x:x.split(',')) \
.filter(lambda x:len(x) == 8) \
.filter(lambda x:len(x[0]) > 1) \
.map(lambda x:x[7])\
.map(lambda x:str(x).lower()) \
.map(lambda x:x.replace("'", "")) \
.map(lambda x:x.replace('"', '')) \
.map(lambda x:resolve_emoticon(x))\
.map(lambda x:remove_features(x))\
.map(lambda x:abb_en(x))\
.map(lambda x:TextBlob(x).sentiment.polarity)\
.map(lambda x:categorize_polarity(x))


sentiment_polarity.take(2)

                                                                                

['Neutral', 'Positive']

In [12]:
sentiment_polarity.count()

                                                                                

1180

In [13]:
text = spark.sparkContext.textFile('bitcoin.csv')\
.map(lambda x:x.split(','))\
.filter(lambda x:len(x)==8)\
.filter(lambda x:len(x[0])>1)\
.map(lambda x:x[7])\
.map(lambda x:str(x).lower())\
.map(lambda x:x.replace("'", ""))\
.map(lambda x:x.replace('"', ''))\
.map(lambda x:x.replace(r')', ''))\
.map(lambda x:x.replace(r'(', ''))\
.map(lambda x:x.replace(r'@', ''))\
.map(lambda x:x.replace(r'#', ''))

 

text.take(3)

['brt justinsuntron: livestream with taiwan legislator and blockchain advocate jason hsu augama will take place tonight at 9pm pst. link co',
 'brt murthaburke: get healthy with some coconut based oils! \\nhttps://t.co/aov8ndtxys \\nblockchain cryptocurrency crypto ethereum trapadr',
 'brt silentwhstlblwr: breaking: $link succesfully executes smartcontract live ropsten etherscan testnet. chainlink use cases are infinite']

In [14]:
created_at = spark.sparkContext.textFile('bitcoin.csv')\
.map(lambda x:x.split(','))\
.filter(lambda x:len(x)==8)\
.filter(lambda x:len(x[0])>1)\
.map(lambda x:x[0]) \
.map(lambda x:str(x).replace("'", ""))\
.map(lambda x:str(x).replace('"', ''))
 
created_at.take(2)

['Tue May 01 01:27:46 +0000 2018', 'Tue May 01 01:27:46 +0000 2018']

In [15]:
#I want 'created_at' to be index [0] and text to be index [1]

combine = created_at.zip(text)\
.map(lambda x:str(x).replace("'", ""))\
.map(lambda x:str(x).replace('"', ''))\
.map(lambda x:str(x).replace(r')', ''))\
.map(lambda x:str(x).replace(r'(', ''))\
.map(lambda x:str(x).replace('\\\\', ''))

                              
combine.take(3)

['Tue May 01 01:27:46 +0000 2018, brt justinsuntron: livestream with taiwan legislator and blockchain advocate jason hsu augama will take place tonight at 9pm pst. link co',
 'Tue May 01 01:27:46 +0000 2018, brt murthaburke: get healthy with some coconut based oils! nhttps://t.co/aov8ndtxys nblockchain cryptocurrency crypto ethereum trapadr',
 'Tue May 01 01:27:46 +0000 2018, brt silentwhstlblwr: breaking: $link succesfully executes smartcontract live ropsten etherscan testnet. chainlink use cases are infinite']

In [16]:
theData = spark.sparkContext.textFile('bitcoin.csv')\
.map(lambda x:x.split(','))\
.filter(lambda x:len(x)==8)\
.filter(lambda x:len(x[0])>1)\
.map(lambda x:x[1:7])\
.map(lambda x:str(x).lower())\
.map(lambda x:x.replace("'", ""))\
.map(lambda x:x.replace('"', ''))\
.map(lambda x:x.replace(r'[', ''))\
.map(lambda x:x.replace(r']', ''))


theData.take(3)

['leffler_iris, <a href=http://twitter.com rel=nofollow>twitter web client</a>, , 0, 57, en',
 'bmqxrxperymoht1, <a href=http://twitter.com rel=nofollow>twitter web client</a>, , 0, 60, en',
 'joshuahinson9, <a href=http://twitter.com/download/iphone rel=nofollow>twitter for iphone</a>, , 97, 449, en']

In [17]:
#I want the sentiment_polarity to be at the end. So zip it with the rest of the data. 

bitcoinRDD = combine.zip(theData).zip(sentiment_polarity)\
.map(lambda x:str(x).replace("'", ""))\
.map(lambda x:str(x).replace('"', ''))\
.map(lambda x:str(x).replace(r')', ''))\
.map(lambda x:str(x).replace(r'(', ''))
 
bitcoinRDD.take(1)

                                                                                

['Tue May 01 01:27:46 +0000 2018, brt justinsuntron: livestream with taiwan legislator and blockchain advocate jason hsu augama will take place tonight at 9pm pst. link co, leffler_iris, <a href=http://twitter.com rel=nofollow>twitter web client</a>, , 0, 57, en, Neutral']

In [18]:
#Check the partitions

bitcoinRDD.getNumPartitions()

2

In [19]:
# I decided to change the partition to 1. The data is isn't that big. 

bitcoinRDD2 = bitcoinRDD.repartition(1)

In [20]:
bitcoinRDD2.getNumPartitions()

1

In [21]:
#Save the files in text file 

bitcoinRDD2.saveAsTextFile("ProjectADE")
print(bitcoinRDD2.count())

[Stage 10:>                                                         (0 + 2) / 2]

1180


                                                                                