In [None]:
!pip install --upgrade watson-developer-cloud

In [None]:
import json
import time

## Load data from DB2 Warehouse on the Cloud to Spark Dataframe
Specify the credentials for your DB2 Warehouse on the cloud instance and read table data into Spark data frame. To do so:

- Click the Data icon (top right)
- Choose the Connections tab
- Select "Insert SparkSession DataFrame"
   - Select the correct schema
   - Choose Table DSX_CLOUDANT_SINGER_TWEETS

This should copy required code into the active notebook cell for accessing your DB2 Warehouse on the Cloud instance and read the table DSX_CLOUDANT_SINGER_TWEETS into a Spark dataframe.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# The code was removed by DSX for sharing.

## Data Exploration and Curation
Run some analysis and exploration of the data to verify it is as expected

In [None]:
# copy data into brandTweetsDF dataframe for processing
brandTweetsDF = data_df_1

In [None]:
# Return top 2 rows of Spark DataFrame
brandTweetsDF.limit(2).toPandas()

In [None]:
# Print the schema of the loaded data
brandTweetsDF.printSchema()

In [None]:
## Drop unneeded columns
brandTweetsDF = brandTweetsDF.drop('_ID','_REV')

In [None]:
import datetime
from datetime import date
from dateutil import parser

def getDay(date):
    print('input date: ', date)
    day = parser.parse(str(date))
    day = day.date()
    return day

# Add a field for the day the tweet was created (ignoring hour/minute/second)
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

udfGetDay = udf(getDay, DateType())

brandTweetsDF = brandTweetsDF.withColumn('DAY',udfGetDay('CREATED_AT'))

# Verify added field is as expected
brandTweetsDF.select("DAY").limit(5).toPandas()

## Extract a Random Sample of Records
Next, we will extract a randome rample of records to run NLU enrichment on. This is needed to make sure we don't exceed our limit of free NLU calls per day.

In [None]:
## Take a sample of the data
## Limit to 1000 records as Watson NLU allows 1000 free calls per day
import random

num_records = brandTweetsDF.count()
sample_num_records = 500
fraction = float(sample_num_records)/float(num_records)

seed = random.randint(1, 100)
print('Number of records: ', num_records, ' Sample size: ', sample_num_records, ' Fraction: ', fraction, ' Seed: ', seed)
brandTweetsSampleDF = brandTweetsDF.sample(False, fraction, seed)


## Alternative Stratified Sampling approach
## Returns RDD with length of 2, first col is the key (day) and second col is the original row for the key
## Take only the actual data (column 1)
## If you'd like to use this approach, uncomment the following 4 lines

#fractionList = brandTweetsDF.rdd.map(lambda x: x['DAY']).distinct().map(lambda x: (x,fraction)).collectAsMap()
#keybyday = brandTweetsDF.rdd.keyBy(lambda x: x['DAY'])
#brandTweetsDFrdd = keybyday.sampleByKey(False,fractionList).map(lambda x: x[1])
#brandTweetsSampleDF = spark.createDataFrame(brandTweetsDFrdd,brandTweetsDF.schema)


print('Number of records to send to NLU:', brandTweetsSampleDF.count())

In [None]:
# plot number of tweets per day
from pyspark.sql import functions as F
brandTweetsSampleDFperDay = brandTweetsSampleDF.groupBy('DAY')\
                              .agg(F.count('ID')\
                              .alias('NUM_TWEETS_PER_DAY'))
brandTweetsSampleDFperDay.show()

## Map to a Pandas dataframe and enrich with Watson Natural Language Understanding (NLU)
Note that in order to call a REST API such as NLU on the sampled records, need to map the Spark data frame to a Pandas data frame and then execute the NLU enrichment using the Pandas data frame. This effectively runs the enrichment code on the master Spark node only.

In [None]:
# Create a Pandas frame and augment with Sentiment analysis and Keywords using Watson NLU
brandTweetsSamplePandasDF = brandTweetsSampleDF.toPandas()

## Specify NLU Credentials
Next, you need to specify the credentials for your Watson Natural Language Understanding (NLU) service. If you don't have an NLU service, you can create one by following [these instructions](https://console.bluemix.net/docs/services/natural-language-understanding/getting-started.html#getting-started-tutorial) and obtaining the service credentials. You need to specify the URL, username, and password.

In [None]:
# Specify NLU credentials
credentials_json= {
    "nlu_url":"YOUR_WATSON_NLU_URL",
    "nlu_username": "YOUR WATSON NLU USERNAME",
	"nlu_password": "YOUR WATSON NLU PASSWORD",
	"nlu_version": "2017-02-27"
}

## Watson NLU Enrichment Definition
In this cell, import the Watson Developer Cloud Python SDK, parse the NLU credentials, and define the function to enrich text with NLU.

In [None]:
import watson_developer_cloud
#import watson_developer_cloud.natural_language_understanding.features.v1 as features
from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, SentimentOptions, KeywordsOptions
from watson_developer_cloud import WatsonException

## Define credentials for NLU service
nlu_url = credentials_json['nlu_url']
nlu_username=credentials_json['nlu_username']
nlu_password=credentials_json['nlu_password']
nlu_version=credentials_json['nlu_version']
nlu = watson_developer_cloud.NaturalLanguageUnderstandingV1(version = nlu_version,
                                                            username = nlu_username,
                                                            password = nlu_password)

## Send text to NLU and extract Sentiment and Keywords
## Make sure text is utf-8 encoded
def enrichNLU(text):
    utf8text = text.encode("utf-8")
    # In python3, need to decode to string
    utf8text = utf8text.decode('utf-8')
    
    try:
        result = nlu.analyze(text = utf8text, features = Features(sentiment=SentimentOptions(),keywords=KeywordsOptions()))
        sentiment = result['sentiment']['document']['score']
        sentiment_label = result['sentiment']['document']['label']
        keywords = list(result['keywords'])  
    except WatsonException:
        result = None
        sentiment = 0.0
        sentiment_label = None
        keywords = None
    #print sentiment
    return sentiment, sentiment_label, keywords
    #return result

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField

schema = brandTweetsSampleDF.schema
schema1 = StructType([
    StructField("relevance", FloatType(), True),
    StructField("text", StringType(), True),                            
])

keywordschema = StructType.fromJson(schema1.jsonValue())
added_fields = [StructField("SENTIMENT", FloatType(), True),StructField("SENTIMENT_LABEL",StringType(),True),\
                StructField("KEYWORDS",ArrayType(keywordschema),True)] 

newfields = StructType(schema.fields + added_fields)

In [None]:
import time
start_time = time.time()
## This calls the enrichNLU function which accesses the Watson NLU API
brandTweetsSamplePandasDF['SENTIMENT'],brandTweetsSamplePandasDF['SENTIMENT_LABEL'],\
brandTweetsSamplePandasDF['KEYWORDS'] = zip(*brandTweetsSamplePandasDF['TEXT_CLEAN'].map(enrichNLU))
print(brandTweetsSamplePandasDF)
print('total run time: ', time.time() - start_time)

In [None]:
brandTweetsSamplePandasDF.count

In [None]:
a = u'bats\u00E0'
print(a)

In [None]:
#v="RT : Retweet to vote for our boy!\n\nMy  vote for  is \u201cDespacito\u201d by   &amp;\u2026 "
v2=r"RT : Retweet to vote for \U0001f36 our boy!\n\nMy  vote for  is \u201cDespacito\u201d by   &amp;\u2026 "
#v2 = "\u2026"
#v2="u'RT : "Despacito dominates  &amp; @JustinBieber has week\'s three top-selling songs  https://t.co/UeqNbm\u2026'"
v2= r"RT : Despacito dominates  &amp; @JustinBieber \U0001f36f has week\'s three top-selling u'\u2026' songs  https://t.co/UeqNbm \u2026 &lt; // yes"
#v2=u'\U0001f36f'
#v2=r'\u2026'
print(type(v2))
if isinstance(v2, str):
    print("yes")
elif isinstance(v2, bytes):
    print("no")
v1=cleanText(v2)

print('v1: ', v1)

In [None]:
i=0
for row in brandTweetsSampleDF.rdd.collect():
    text = row['TEXT_CLEAN']
    print "i: ", i
    i = i + 1
    print "text: ", text
    tclean = cleanText(text)
    print "clean text: ", tclean

In [None]:
brandTweetsCleanDF.head(10)

In [None]:
brandTweetsCleanDF.count()

In [None]:
#brandTweetsSampleDF.selectExpr("TEXT_CLEAN as text").createOrReplaceTempView("brandtweetstbl")
#bd = brandTweetsSampleDF.selectExpr("TEXT_CLEAN as text")
bd = brandTweetsCleanDF.selectExpr("textnew as text")

In [None]:
bd.head(2)

In [None]:
from pyspark.sql.functions import lit
#bd = brandTweetsSampleDF.withColumn('features',lit('keywords,sentiment'))
bd = bd.withColumn('features',lit('keywords,sentiment'))
bd.head(2)

In [None]:
from pyspark.sql.functions import lit
#bd = brandTweetsSampleDF.withColumn('features',lit('keywords,sentiment'))
bd = bd.withColumn('features',lit('{"keywords":{},"sentiment":{}}'))
bd.head(2)

In [None]:
bd.selectExpr("text","features").createOrReplaceTempView("bdtbl")


In [None]:
nlu_uri = "https://gateway.watsonplatform.net/natural-language-understanding/api/v1/analyze?version=2017-02-27"

In [None]:
nluprms = {'url' : nlu_uri, 'input' : 'bdtbl', 'method' : 'GET', 'userId':nlu_username, 'userPassword':nlu_password, 'callStrictlyOnce': 'Y', 'partitions': '10', 'connectionTimeout':'2000', 'readTimeout':'10000'}
#nluprms = {'url' : nlu_uri, 'input' : 'bdtbl', 'method' : 'POST', 'userId':nlu_username, 'userPassword':nlu_password, 'callStrictlyOnce': 'Y', 'partitions': '10', 'connectionTimeout':'3000', 'readTimeout':'15000'}

In [None]:
start_time = time.time()
#%%time
#start_time = time.clock()
brandtweetsNLUDF = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**nluprms).load()
#print("NLU enrichment in a parallel manner using REST datasource externsion execution took ", time.clock() - start_time, "seconds")
print(brandtweetsNLUDF)
print("total run time for REST data source: ", time.time() - start_time)

In [None]:
brandtweetsNLUDF.printSchema()

In [None]:
brandtweetsNLUDF.head(5)

In [None]:
brandtweetsNLUDF.count()

In [None]:
from pyspark.sql.functions import col
brandtweetsNLUDF.where(col("_corrupt_record").isNotNull()).count()

In [None]:
tmpbrandtweetsNLUDF = brandtweetsNLUDF.where(col("_corrupt_record").isNotNull())
tmpbrandtweetsNLUDF.head(2)

In [None]:
brandtweetsNLUDF.where(col("_corrupt_record").isNull()).count()

In [None]:
print nlu_uri

In [None]:
brandtweetsNLUDF.limit(5).toPandas()

In [None]:
d1 = brandtweetsNLUDF['output']

In [None]:
d1.limit(5).toPandas()

In [None]:
# Spark execution model
# https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
