<a href="https://colab.research.google.com/github/msaadsadiq/BigDataCourse/blob/master/Assignment_3_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ECE 795 - Big Data
## Assignment #3 - Twitter Sentiment Analysis and Word Count using PySpark Distributed Computing on DataProc 

### Provide your credentials to the runtime

In [0]:
# Authenticate your student profile

from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


### Set the Project ID and Enable APIs

In [0]:
project_id = 'ece795'

#### In GCP, there are many different services; Compute Engine, Cloud Storage, BigQuery, Cloud SQL, Cloud Dataproc to name a few. In order to use any of these services in your project, you first have to enable them.

![alt text](https://cdn-images-1.medium.com/max/1600/1*rYZZH8w9iScxIXG27qG-ww.png)

#### Put your mouse over “APIs & Services” on the left-side menu, then click into “Library”. For this project, we will enable three APIs: Cloud Dataproc, Compute Engine, and Cloud Storage.

![alt text](https://cdn-images-1.medium.com/max/1600/1*qH5u_JSH2JLZW_SQTcetSQ.png)

### Running Example 1: Word Count

#### This word count example will use the Shakespeare dataset in BigQuery. The only difference is that instead of using Hadoop, it uses PySpark which is a Python library for Spark

Step 1: create a dataset named "wordcount_dataset"
- Select your project or create a new one, remember to enable billing
- Go to Big Query
- Create a dataset, and name it wordcount_dataset

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assign3_img11.png?raw=true)

#### Step 2: create a cluster in Dataproc and Google Cloud Storage. Go to Dataproc and create a cluster similar to our previous tutorial. It should look like this

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img2.png?raw=true)

Now click on the link in the “Cloud Storage staging bucket” column, it will bring you to your Cloud
Storage. This is where we will upload our python code, which will then give us a link to submit a job to the cluster.

#### Step 3: modify the code and upload it to Cloud Storage. Download the following code from here

Change the input and output directory strings. Replace the {} symbols with your Cloud Storage Bucket id.

input_directory =
'gs://{}/hadoop/tmp/bigquery/pyspark_input'.
output_directory =

'gs://{}/hadoop/tmp/bigquery/pyspark_output'

Upload the python file to your Cloud Storage. We will use this link as input to our PySpark job.

In [0]:
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda record: json.loads(record[1]))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
output_files = output_directory + '/part-*'

sql_context = SQLContext(sc)
(word_counts
 .toDF(['word', 'word_count'])
 .write.format('json').save(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img3.png?raw=true)

For example, here I uploaded sparkwc.py, and my link would be gs://dataproc-e4225d08-23a8-481faff9-5205024119a5-us/sparkwc.py

#### Step 4: submit the job. Similar to our Hadoop example. Use the settings below. Remember to use your own gs:// link, not mine. 

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img12.png?raw=true)

#### Step 5: browse the result. Once the job is done (should be around 2~10 minutes). Go back to Big Query and explore the result.

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img5.png?raw=true)

 #### Reference. 
https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
https://piazza.com/class_profile/get_resource/j1318wb5md21wb/j2jthscnpkb5zu

### Running Example 2: Twitter Sentiment

#### Click into “Storage” from left-side menu, then you’ll see a page like the above. Click “Create bucket”

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img7.png?raw=true)

#### Go to Dataproc from the left side menu (you have to scroll down a bit. It’s under Big Data section) and click on “Clusters”. Click “Create clusters”

#### Get the Data

In [0]:
!wget http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip
!unzip trainingandtestdata.zip   
!rm trainingandtestdata.zip test*.csv


URL transformed to HTTPS due to an HSTS policy
--2019-03-07 17:55:57--  https://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip
Resolving cs.stanford.edu (cs.stanford.edu)... 171.64.64.64
Connecting to cs.stanford.edu (cs.stanford.edu)|171.64.64.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 81363704 (78M) [application/zip]
Saving to: ‘trainingandtestdata.zip’


2019-03-07 17:56:03 (13.4 MB/s) - ‘trainingandtestdata.zip’ saved [81363704/81363704]

Archive:  trainingandtestdata.zip
  inflating: testdata.manual.2009.06.14.csv  
replace training.1600000.processed.noemoticon.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [0]:
import pandas as pd
import numpy as np

# set the names for each column
cols = ['sentiment','id','date','query_string','user','text']
def main():
	# read training data with ISO-8859-1 encoding and column names set above
	df = pd.read_csv('training.1600000.processed.noemoticon.csv', encoding = 'ISO-8859-1',names=cols)
	# shuffle the data
	df = df.sample(frac=1).reset_index(drop=True)
	# set the random seed and split train and test with 99 to 1 ratio
	np.random.seed(777)
	msk = np.random.rand(len(df)) < 0.99
	train = df[msk].reset_index(drop=True)
	test = df[~msk].reset_index(drop=True)
	# save both train and test as CSV files
	train.to_csv('pyspark_sa_train_data.csv')
	test.to_csv('pyspark_sa_test_data.csv')

In [0]:
main()

#### Upload pyspark_sa_train_data.csv and pyspark_sa_test_data.csv to your bucket. Verify the upload 

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img6.png?raw=true)

#### Submitting Spark Job. Finally, we are ready to run the training on Google Dataproc. Download the following code and submit it to your cluster as a pyspark job, pyspark_sa.py

In [0]:
#import libraries
import sys
import pyspark as ps
import warnings
import re
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, NGram, CountVectorizer, IDF, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import PipelineModel

# retrieve command line arguments and store them as variables
inputdir = sys.argv[1]
outputfile = sys.argv[2]
modeldir = sys.argv[3]

#define regex pattern for preprocessing
pat1 = r'@[A-Za-z0-9_]+'
pat2 = r'https?://[^ ]+'
combined_pat = r'|'.join((pat1,pat2))
www_pat = r'www.[^ ]+'
negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')

# preprocessing for
# first_process: to remove Twitter handle and URL
# second_process: to remove URL pattern starting with www.
# third_process: to lower characters
# fourth_process: to replace contracted negation with proper forms
# result: remove numbers and special characters
def pre_processing(column):
    first_process = re.sub(combined_pat, '', column)
    second_process = re.sub(www_pat, '', first_process)
    third_process = second_process.lower()
    fourth_process = neg_pattern.sub(lambda x: negations_dic[x.group()], third_process)
    result = re.sub(r'[^A-Za-z ]','',fourth_process)
    return result.strip()

# build a pipeline following below order
# tokenizer: split a tweet into individual words
# ngrams: create n-gram representation of words. Here it's set to trigram
# cv: turn n-gram representaion to a sparse representaion of the token counts. Below 5,460 is used as vocabulary size
# idf: calculate inverse document frequency from the result of the previous step 
#      to diminishes the weight of terms that occur very frequently in the document set 
#      and increases the weight of terms that occur rarely.
# assembler: transform the result of previous step to a single feature vector
# label_stringIdx: encode target labels to a column of label indices. 
#                  The indices are ordered by label frequencies, so the most frequent label gets index 0.
# lr: fit logistic regression with 'features' and 'label'
def build_pipeline():
	tokenizer = [Tokenizer(inputCol='tweet',outputCol='words')]
	ngrams = [NGram(n=i, inputCol='words', outputCol='{0}_grams'.format(i)) for i in range(1,4)]
	cv = [CountVectorizer(vocabSize=5460, inputCol='{0}_grams'.format(i), outputCol='{0}_tf'.format(i)) for i in range(1,4)]
	idf = [IDF(inputCol='{0}_tf'.format(i), outputCol='{0}_tfidf'.format(i), minDocFreq=5) for i in range(1,4)]
	assembler = [VectorAssembler(inputCols=['{0}_tfidf'.format(i) for i in range(1,4)], outputCol='features')]
	label_stringIdx = [StringIndexer(inputCol='sentiment', outputCol='label')]
	lr = [LogisticRegression(maxIter=100)]
	pipeline = Pipeline(stages=tokenizer+ngrams+cv+idf+assembler+label_stringIdx+lr)
	return pipeline

# below main function can be use for either first training or getting predictions with a loaded model
# first retrieve data
# apply pre-processing by making the above defined pre_processing function to a user defined function
# either build the pipeline from the above build_pipeline function and train or use a loaded pipeline model
# make predictions on the test set
# output the pipeline model, Spark dataframe of the predictions, and the prediction accuracy on the test set
def main(sqlc,input_dir,loaded_model=None):
	print('retrieving data from {}'.format(input_dir))
	if not loaded_model:
		train_set = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(input_dir+'training_data.csv')
	test_set = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(input_dir+'test_data.csv')
	print('preprocessing data...')
	reg_replaceUdf = f.udf(pre_processing, t.StringType())
	if not loaded_model:
		train_set = train_set.withColumn('tweet', reg_replaceUdf(f.col('text')))
	test_set = test_set.withColumn('tweet', reg_replaceUdf(f.col('text')))
	if not loaded_model:
		pipeline = build_pipeline()
		print('training...')
		model = pipeline.fit(train_set)
	else:
		model = loaded_model
	print('making predictions on test data...')
	predictions = model.transform(test_set)
	accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_set.count())
	return model, predictions, accuracy


if __name__=="__main__":
	# create a SparkContext while checking if there is already SparkContext created
	try:
	    sc = ps.SparkContext()
	    sc.setLogLevel("ERROR")
	    sqlContext = ps.sql.SQLContext(sc)
	    print('Created a SparkContext')
	except ValueError:
	    warnings.warn('SparkContext already exists in this scope')
	# build pipeline, fit the model and retrieve the outputs by running main() function
	pipelineFit, predictions, accuracy = main(sqlContext,inputdir)
	print('predictions finished!')
	print('accuracy on test data is {}'.format(accuracy))
	# select the original target label 'sentiment', 'text' and 'label' created by label_stringIdx in the pipeline
	# model predictions. Save it as a single CSV file to a destination specified by the second command line argument
	print('saving predictions to {}'.format(outputfile))
	predictions.select(predictions['sentiment'],predictions['text'],predictions['label'],predictions['prediction']).coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv(outputfile)
	# save the trained model to destination specified by the third command line argument
	print('saving model to {}'.format(modeldir))
	pipelineFit.save(modeldir)
	# Load the saved model and make another predictions on the same test set
	# to check if the model was properly saved
	loadedModel = PipelineModel.load(modeldir)
	_, _, loaded_accuracy = main(sqlContext,inputdir,loadedModel)
	print('accuracy with saved model on test data is {}'.format(loaded_accuracy))
	sc.stop()

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img8.png?raw=true)

From the above screenshot replace the blurred parts of the texts to your project ID, then click “submit” at the bottom. You can inspect the output of the machine by clicking into the job. The job is finished after 15 minutes

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img9.png?raw=true)

#### Checking the Results. Go to your bucket, then go into pyspark_nlp folder. You will see that the results of the above Spark job have been saved into “result” directory (for the prediction data frame), and “model” directory (fitted pipeline model).

![alt text](https://github.com/msaadsadiq/BigDataCourse/blob/master/Assgn3_img10.png?raw=true)

#### Reference
https://towardsdatascience.com/step-by-step-tutorial-pyspark-sentiment-analysis-on-google-dataproc-fef9bef46468

### Question 1. 

1. Download 10,000 Tweets about any topic you like (english language, must be a topic of divide i.e. having more than one view)

2. Save them as csv and Upload Tweets to storage bucket
3. Using PySpark run the sentiment analysis on those tweets using the model you trained earlier  
https://gist.github.com/yanofsky/5436496

##### Twitter Datasets

https://data.world/datasets/twitter

https://www.kaggle.com/crowdflower/twitter-airline-sentiment

https://www.kaggle.com/crowdflower/first-gop-debate-twitter-sentiment

https://www.kaggle.com/shashank1558/preprocessed-twitter-tweets

https://www.kaggle.com/kazanova/sentiment140

### Question 2. 

1. Perform word count on your tweets
2. Perform a word count on the different stances