In [None]:
# Define the currency
#CURRENCY = "zilliqa"
#CURRENCY_SYMBOL = "ZIL"
#CURRENCY = "nexo"
#CURRENCY_SYMBOL = "NEXO"
CURRENCY = "bitcoin"
CURRENCY_SYMBOL = "BTC"

## personal config
TWEETS_FOLDER    = f"data/crypto/{CURRENCY_SYMBOL}" # Relative path to historical data
SEP_CHAR         = '~' # character seperating dates from and to in filename
ENVS             = ['CRYPTO', 'LINE_COUNT', 'MOST_RECENT_FILE', 'MOST_RECENT_ID'] # Stored in var.csv
MAX_ROW_PER_FILE = 20000 # Each file storing data has a maximum amount of rows

tweets_raw_file = f'data/twitter/{CURRENCY_SYMBOL}/{CURRENCY}_tweets_raw.csv'
tweets_clean_file = f'data/twitter/{CURRENCY_SYMBOL}/{CURRENCY}_tweets_clean.csv'
query = f'#{CURRENCY} OR #{CURRENCY_SYMBOL}' ####TODO PUT BACK  OR {CURRENCY} OR ${CURRENCY} OR ${CURRENCY_SYMBOL}

In [None]:
import os
if not os.path.exists(os.path.dirname(tweets_raw_file)):
    try:
        os.makedirs(os.path.dirname(tweets_raw_file))
    except OSError as exc: # Guard against race condition
        if exc.errno != errno.EEXIST:
            raise

In [None]:
from twython import Twython

In [None]:
APP_KEY = '5LCFAyeZfZhfVo1uqWWDEKMdj'
APP_SECRET = 'oL6TkV3QHAEiPmMs8PPnNs93g3IFS3pFqS0lKUzU7yeykQZJfT'
twitter = Twython(APP_KEY, APP_SECRET, oauth_version=2)
ACCESS_TOKEN = twitter.obtain_access_token()
twitter = Twython(APP_KEY, access_token=ACCESS_TOKEN)
twitter.get_application_rate_limit_status()['resources']['search']

In [None]:
from time import sleep
import json
import pandas as pd
import io
from tqdm import tqdm

In [None]:
NUMBER_OF_QUERIES = 450
data = {"statuses": []}
next_id = "998511795781361665"
with open(tweets_raw_file,mode = "a+", encoding='utf-8') as f:
    if not next_id:
        f.write("ID,Text,UserName,UserFollowerCount,RetweetCount,Likes,CreatedAt\n")
    while(True):
        twitter = Twython(APP_KEY, access_token=ACCESS_TOKEN)
        last_size = 0
        for i in tqdm(range(NUMBER_OF_QUERIES)):
            if not next_id:
                data = twitter.search(q=query, lang='en', result_type='recent', count="100") # Use since_id for tweets after id
            else:
                data["statuses"].extend(twitter.search(q=query, lang='en', result_type='mixed', count="100", max_id=next_id)["statuses"])
            if len(data["statuses"]) > 1:
                next_id = data["statuses"][len(data["statuses"]) - 1]['id']
            if last_size + 1 == len(data["statuses"]):
                break
            else:
                last_size = len(data["statuses"])

        print('Retrieved {0}, waiting for 15 minutes until next queries'.format(len(data["statuses"])))
        d = pd.DataFrame([[s["id"], s["text"].replace('\n','').replace('\r',''), s["user"]["name"], s["user"]["followers_count"], s["retweet_count"], s["favorite_count"], s["created_at"]] for s in data["statuses"]], columns=('ID', 'Text', 'UserName', "UserFollowerCount", 'RetweetCount', 'Likes', "CreatedAt"))
        d.to_csv(f, mode='a', encoding='utf-8',index=False,header=False)
        if last_size + 1 == len(data["statuses"]):
            print('No more new tweets, stopping...')
            break
        data["statuses"] = []
        
        sleep(910)

In [None]:
if not os.path.exists(os.path.dirname(tweets_clean_file)):
    try:
        os.makedirs(os.path.dirname(tweets_clean_file))
    except OSError as exc: # Guard against race condition
        if exc.errno != errno.EEXIST:
            raise

In [None]:
import re # regular expressions
from tqdm import tnrange, tqdm_notebook, tqdm

d = pd.read_csv(tweets_raw_file)
for i,s in enumerate(tqdm(d['Text'])):
    text = d.loc[i, 'Text']
    text = text.replace("#", "")
    text = re.sub('https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+', '', text, flags=re.MULTILINE)
    text = re.sub('@\\w+ *', '', text, flags=re.MULTILINE)
    d.loc[i, 'Text'] = text
f = open(tweets_clean_file, 'a+', encoding='utf-8')
d.to_csv(f, header=True, encoding='utf-8',index=False)

In [None]:
# importing required libraries
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

# initializing spark session
sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)

In [None]:
#define the schema

my_schema = tp.StructType([
  tp.StructField(name= 'ID',          dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'Text',       dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'UserName',       dataType= tp.StringType(),   nullable= True),
  tp.StructField(name= 'UserFollowerCount',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'RetweetCount',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'Likes',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'CreatedAt',          dataType= tp.StringType(),  nullable= True),
])

my_data = spark.read.csv(tweets_clean_file,
                         schema=my_schema,
                         header=True)
#view the data
my_data.show(5)

#print the schema of the file
my_data.printSchema()

In [None]:
stage_1 = RegexTokenizer(inputCol= 'Text' , outputCol= 'tokens', pattern= '\\W')
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

In [None]:
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

# fit the pipeline model with the training data
pipelineFit = pipeline.fit(my_data)

In [None]:
def get_prediction(tweet_text):
    try:
    # filter the tweets whose length is greater than 0
        tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # create a dataframe with column name 'tweet' and each row will contain the tweet
        rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # create a spark dataframe
        wordsDataFrame = spark.createDataFrame(rowRdd)
    # transform the data using the pipeline and get the predicted sentiment
        pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
    except : 
        print('No data')
    
# initialize the streaming context 
ssc = StreamingContext(sc, batchDuration= 3)

# Create a DStream that will connect to hostname:port, like localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

# split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

# get the predicted sentiments for the tweets received
words.foreachRDD(get_prediction)

# Start the computation
ssc.start()             

# Wait for the computation to terminate
ssc.awaitTermination()  