In [1]:
import findspark
findspark.init()
import pyspark ############ only run after findspark.init() ########
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline


import matplotlib.pyplot as plt
import pandas as pd
import sqlite3
import pandas as pd
import string
from tqdm import tqdm


import spacy
import spacy
from spacy.lang.en.stop_words import STOP_WORDS
from spacy.lang.en import English

%matplotlib inline

In [2]:
sc = SparkSession.builder.getOrCreate()
print(sc.version)

2.3.2


In [3]:
bitcoin_df_spark = sc.read.load('../../data/bitcoinoct17tooct18/all.csv', format="csv", header=True)
eth_df_spark = sc.read.load('../../data/eth_1aug15_1aug18/0.csv', format="csv", header=True)
facebook_df_spark = sc.read.load('../../data/facebook3March18to1Sep18/0.csv', format="csv", header=True)


In [4]:
bitcoin_df_spark.show(5)

+-------------------+-------------------+------------------+---------------+----------+----------------+--------+--------------------+-------+--------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+---------+-------------+---------------+-------------------+-------------------+------------------+
|            TweetID|     ConversationID|          AuthorId|     AuthorName|isVerified|        DateTime|Language|           TweetText|Replies|Retweets|Favorites|            Mentions|            Hashtags|           Permalink|                URLs|isPartOfConversation|isReply|isRetweet|ReplyToUserID|ReplyToUserName|      QuotedTweetID|QuotedTweetUserName| QuotedTweetUserID|
+-------------------+-------------------+------------------+---------------+----------+----------------+--------+--------------------+-------+--------+---------+--------------------+--------------------+--------------------+--------------------+-------

In [5]:
eth_df_spark.show(5)

+-------------------+-------------------+------------------+-------------+----------+----------------+--------------------+-------+--------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+---------+-------------+---------------+-------------------+-------------------+------------------+
|            TweetID|     ConversationID|          AuthorId|   AuthorName|isVerified|        DateTime|           TweetText|Replies|Retweets|Favorites|            Mentions|            Hashtags|           Permalink|                URLs|isPartOfConversation|isReply|isRetweet|ReplyToUserID|ReplyToUserName|      QuotedTweetID|QuotedTweetUserName| QuotedTweetUserID|
+-------------------+-------------------+------------------+-------------+----------+----------------+--------------------+-------+--------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+---------+-

In [6]:
facebook_df_spark.show(5)

+-------------------+-------------------+-------------------+---------------+----------+----------------+--------------------+-------+--------+---------+---------+--------------------+--------------------+--------------------+--------------------+-------+---------+-------------+---------------+-------------------+-------------------+-----------------+
|            TweetID|     ConversationID|           AuthorId|    Author Name|isVerified|        DateTime|           TweetText|Replies|Retweets|Favorites| Mentions|            Hashtags|           Permalink|                URLs|isPartOfConversation|isReply|isRetweet|ReplyToUserID|ReplyToUserName|      QuotedTweetID|QuotedTweetUserName|QuotedTweetUserID|
+-------------------+-------------------+-------------------+---------------+----------+----------------+--------------------+-------+--------+---------+---------+--------------------+--------------------+--------------------+--------------------+-------+---------+-------------+-------------

In [7]:
facebook_df_spark.printSchema()

root
 |-- TweetID: string (nullable = true)
 |-- ConversationID: string (nullable = true)
 |-- AuthorId: string (nullable = true)
 |-- Author Name: string (nullable = true)
 |-- isVerified: string (nullable = true)
 |-- DateTime: string (nullable = true)
 |-- TweetText: string (nullable = true)
 |-- Replies: string (nullable = true)
 |-- Retweets: string (nullable = true)
 |-- Favorites: string (nullable = true)
 |-- Mentions: string (nullable = true)
 |-- Hashtags: string (nullable = true)
 |-- Permalink: string (nullable = true)
 |-- URLs: string (nullable = true)
 |-- isPartOfConversation: string (nullable = true)
 |-- isReply: string (nullable = true)
 |-- isRetweet: string (nullable = true)
 |-- ReplyToUserID: string (nullable = true)
 |-- ReplyToUserName: string (nullable = true)
 |-- QuotedTweetID: string (nullable = true)
 |-- QuotedTweetUserName: string (nullable = true)
 |-- QuotedTweetUserID: string (nullable = true)



In [8]:
bitcoin_df_spark.count()

3039052

In [9]:
sqlContext = SQLContext(sc)
nlp = spacy.load('en_core_web_lg')
parser = English()
punctuations = string.punctuation
stopwords = list(STOP_WORDS)
def spacy_tokenizer(sentence):
    print('in')
    mytokens = parser(sentence[1:100000])
    mytokens = [ word.lemma_.lower().strip() if word.lemma_ != "-PRON-" else word.lower_ for word in mytokens ]
    mytokens = [ word for word in mytokens if word not in stopwords and word not in punctuations ]
    #mytokens = " ".join([i for i in mytokens])
    return mytokens

In [10]:
spacy_tokenizer_udf = udf(f = spacy_tokenizer, returnType= ArrayType(StringType()))

In [11]:
bitcoin_df_spark = bitcoin_df_spark.withColumn("processed_text", spacy_tokenizer_udf(bitcoin_df_spark['TweetText']))
eth_df_spark = eth_df_spark.withColumn("processed_text", spacy_tokenizer_udf(eth_df_spark['TweetText']))
facebook_df_spark = facebook_df_spark.withColumn("processed_text", spacy_tokenizer_udf(facebook_df_spark['TweetText']))

In [13]:
bitcoin_df_spark.show(5)

+-------------------+-------------------+------------------+---------------+----------+----------------+--------+--------------------+-------+--------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+---------+-------------+---------------+-------------------+-------------------+------------------+--------------------+
|            TweetID|     ConversationID|          AuthorId|     AuthorName|isVerified|        DateTime|Language|           TweetText|Replies|Retweets|Favorites|            Mentions|            Hashtags|           Permalink|                URLs|isPartOfConversation|isReply|isRetweet|ReplyToUserID|ReplyToUserName|      QuotedTweetID|QuotedTweetUserName| QuotedTweetUserID|      processed_text|
+-------------------+-------------------+------------------+---------------+----------+----------------+--------+--------------------+-------+--------+---------+--------------------+--------------------+-------