In [None]:
#imports.... Run this each time after restarting the Kernel
!pip install watson_developer_cloud
# Run pip install only the first time, once installed on your Spark machine, no need to re-run unless you want to upgrade
!pip install --upgrade --force-reinstall wordcloud
!pip install --user --upgrade pixiedust

In [None]:
import watson_developer_cloud as watson
from botocore.client import Config
import ibm_boto3

from urllib.request import urlopen 
import requests
import json
import io
from os.path import join, dirname

from watson_developer_cloud import SpeechToTextV1, NaturalLanguageUnderstandingV1, NaturalLanguageClassifierV1, ToneAnalyzerV3
from watson_developer_cloud.natural_language_understanding.features import (
    v1 as Features)


import pixiedust
from pixiedust.display import *

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

import matplotlib.pyplot as plt

from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [None]:
# The code was removed by DSX for sharing.

In [None]:
# The code was removed by DSX for sharing.

In [None]:
def set_up_object_storage(credentials_object_storage):
    endpoints = requests.get(credentials_object_storage['endpoints']).json()

    iam_host = (endpoints['identity-endpoints']['iam-token'])
    cos_host = (endpoints['service-endpoints']['cross-region']['us']['public']['us-geo'])

    auth_endpoint = "https://" + iam_host + "/oidc/token"
    service_endpoint = "https://" + cos_host


    client = ibm_boto3.client(
        's3',
        ibm_api_key_id = credentials_object_storage['apikey'],
        ibm_service_instance_id = credentials_object_storage['resource_instance_id'],
        ibm_auth_endpoint = auth_endpoint,
        config = Config(signature_version='oauth'),
        endpoint_url = service_endpoint
       )
    return client

client = set_up_object_storage(credentials_os)

In [None]:
# Set up Watson STT

speech_to_text = SpeechToTextV1(
    username = credentials_stt['username'],
    password = credentials_stt['password'],
    url = 'https://stream.watsonplatform.net/speech-to-text/api',
)

def get_transcript(audio):
    transcript = json.dumps(speech_to_text.recognize(audio=audio, content_type='audio/ogg', timestamps=True,
        word_confidence=True), indent=2)
    return transcript

def download_file(path, filename):
    url = path + filename
    print(url)
    r = requests.get(url, stream=True)
    return r.content

def analyze_sample(sample):
    streaming_body = client.get_object(Bucket = credentials_os['BUCKET'], Key=sample)['Body'] #http
    audio = streaming_body.read()
    text = get_transcript(audio)
    # client.put_object(Bucket = credentials_os['BUCKET'], Key = sample.split('.')[0] + '_text.json', Body = text) ## Already done for Demo purposes
    return text

def visualize(transcript):
    for result in json.loads(transcript)['results']:
        print(result['alternatives'][0]['transcript'], result['alternatives'][0]['confidence'])    


In [None]:
# Setup Watson NLU
features = {"concepts":{},"entities":{},"keywords":{},"categories":{},"emotion":{},"sentiment":{},"semantic_roles":{} }

natural_language_understanding = NaturalLanguageUnderstandingV1(
    version = '2017-02-27',
    username = credentials_nlu['username'],
    password = credentials_nlu['password']
)

chunk_size=25 # This CHUNK size is used to disaggregate a transcript 
#e.g. in this case a 290 word transcript would have 10 chunks - 9 with 30 words and 1 with 20 words - approximates 'time domain' for this lab

def chunk_transcript(transcript, chunk_size):
    transcript = transcript.split(' ')
    return [ transcript[i:i+chunk_size] for i in range(0, len(transcript), chunk_size) ] # chunking data

        
def process_text_chunks(text):
    transcript=''
    for sentence in json.loads(text)['results']:
        transcript = transcript + sentence['alternatives'][0]['transcript'] # concatenate sentences
    transcript = chunk_transcript(transcript, chunk_size) # chunk the transcript
    return  transcript

def analyze_transcript_chunks(features, file_name):
    streaming_body = client.get_object(Bucket = credentials_os['BUCKET'], Key=file_name.split('.')[0]+'_text.json')['Body']
    transcript=streaming_body.read().decode("utf-8")
    nlu_analysis={}
    for chunk in process_text_chunks(transcript):
        chunk = ' '.join(chunk)
        print('chunk: ', chunk)
        nlu_analysis[chunk] = natural_language_understanding.analyze(features, chunk, return_analyzed_text=True, language='en')
    outfilename = file_name.split('.')[0]+'_NLUchunks.json'
    print("writing file: ", outfilename, " to cloud object storage" )
    # res=client.put_object(Bucket = credentials_os['BUCKET'], Key=outfilename, Body= json.dumps(nlu_analysis)) Already done for Demo purposes
    return nlu_analysis


def post_analysis_chunks(result):
    for chunk in result.keys():
        categories = result[chunk]['categories']
        print('\nchunk: ', chunk)
        for category in categories:
            print('label: ', category['label'], ', score: ', category['score']) #add table instead of prints

In [None]:
result = analyze_transcript_chunks(features, file_list[0])
post_analysis_chunks(result)

In [None]:
# Setup Watson Natural Language Classifier

natural_language_classifier = NaturalLanguageClassifierV1(
    username = credentials_nlc['username'],
    password = credentials_nlc['password'])

chunk_size = 25
# Used to SPLIT up - "CHUNK" the aggregate transcript into smaller pieces    

def process_text(text):
    transcript=''
    for sentence in json.loads(text)['results']:
        transcript = transcript + sentence['alternatives'][0]['transcript'] # concatenate sentences
    transcript = chunk_transcript(transcript, chunk_size) # chunk the transcript
    return transcript

def classify(file_name):
    streaming_body = client.get_object(Bucket = credentials_os['BUCKET'], Key = file_name.split('.')[0]+'_text.json')['Body']
    transcript=streaming_body.read().decode("utf-8")
    analysis = {}
    for chunk in process_text(transcript):
        chunk = ' '.join(chunk)
        analysis[chunk] = natural_language_classifier.classify(credentials_nlc['classifier_id'], chunk)
    ## client.put_object(Bucket = credentials_os['BUCKET'], Key = file_name.split('.')[0]+'_nlc', Body= json.dumps(analysis)) # Done already for Demo purposes
    return analysis


def classify_transcript(file_name):
    status = natural_language_classifier.get_classifier(credentials_nlc['classifier_id'])
    if status['status'] == 'Available':
        classes = classify(file_name)
    return classes

In [None]:
analysis = classify_transcript(file_list[0])
print(analysis)

In [None]:
# Setup Watson Tone Analyzer

tone_analyzer = ToneAnalyzerV3(version = '2016-05-19',
                               username = credentials_tone['username'],
                               password = credentials_tone['password'])


chunk_size=25

def analyze_transcript(file_name):
    transcript = client.get_object(Bucket = credentials_os['BUCKET'], Key = file_name.split('.')[0]+'_text.json')['Body']
    transcript = transcript.read().decode("utf-8")
    tone_analysis={}
    for chunk in process_text(transcript):
        if len(chunk) > 2:
            chunk = ' '.join(chunk)
            tone_analysis[chunk] = tone_analyzer.tone(chunk, content_type='text/plain')
    # res=client.put_object(Bucket = credentials_os['BUCKET'], Key= file_name.split('.')[0]+'_tone.json', Body = json.dumps(tone_analysis))
    return tone_analysis

def print_tones(tones):
    for tone in tones:
        print(tone)

def post_analysis(result):
    for chunk in result.keys():
        tone_categories = result[chunk]['document_tone']['tone_categories']
        print('\nchunk: ', chunk)
        for tone_category in tone_categories:
            print_tones(tone_category['tones'])

In [None]:
result = analyze_transcript(file_list[0])
post_analysis(result) 

In [None]:
# Method to parse NLU response file from Cloud Object Storage
# and return sentiment score, sentiment label, and keywords
# This method works for the scenario of one NLU call per call (file)
def getNLUresponse(COSclient, bucket, files):
    nlu_results = []
    for filename in files:
        # Extract NLU enriched filename from the original file name
        nlu_filename = filename.split('.')[0]+'_NLU.json'
        print("Processing NLU response from file: ", nlu_filename)
        streaming_body = COSclient.get_object(Bucket=bucket, Key=nlu_filename)['Body']
        nlu_response = json.loads(streaming_body.read().decode("utf-8"))
        #print(json.dumps(nlu_response,indent=2))
        if nlu_response and nlu_response['sentiment'] \
        and nlu_response['sentiment']['document'] and nlu_response['sentiment']['document']['label']:
            sentiment_score = nlu_response['sentiment']['document']['score']
            sentiment_label = nlu_response['sentiment']['document']['label']
            keywords = list(nlu_response['keywords'])
        else:
            sentiment_score = 0.0
            sentiment_label = None
            keywords = null
        nlu_results.append((filename,sentiment_score,sentiment_label,keywords))
    return (nlu_results)

In [None]:
# Method to parse NLU Emotion Tone response file from Cloud Object Storage
def getChunkNLU(nlu_response):
    #print(json.dumps(nlu_response,indent=2))
    if nlu_response and nlu_response['sentiment'] \
    and nlu_response['sentiment']['document'] and nlu_response['sentiment']['document']['label']:
        sentiment_score = nlu_response['sentiment']['document']['score']
        sentiment_label = nlu_response['sentiment']['document']['label']
        keywords = list(nlu_response['keywords'])
    else:
        sentiment_score = 0.0
        sentiment_label = None
        keywords = null
    
    return sentiment_score, sentiment_label, keywords

# Method to parse NLU response file from Cloud Object Storage
# and return sentiment score, sentiment label, and keywords
# This method handles the scenario when call is broken into multiple chunks
def getNLUresponseChunks(COSclient, bucket, files):
    nlu_results = []
    print("files: ", files)
    for filename in files:
        # Extract NLU enriched filename from the original file name
        nlu_filename = filename.split('.')[0]+'_NLUchunks.json'
        print("Processing NLU response from file: ", nlu_filename)
        streaming_body = COSclient.get_object(Bucket=bucket, Key=nlu_filename)['Body']
        nlu_chunks_response = json.loads(streaming_body.read().decode("utf-8"))
        if nlu_chunks_response and len(nlu_chunks_response)>0:
            chunkidx = 0
            for chunk in nlu_chunks_response:
                chunk_nlu = getChunkNLU(nlu_chunks_response[chunk])
                print('chunk nlu: ', chunk_nlu)
                print('type of chunk nlu: ', type(chunk_nlu))
                chunkidx = chunkidx + 1
                tmp_results = (filename, chunkidx, chunk_nlu)
                l = list((filename,chunkidx)) + list(chunk_nlu)
                nlu_results.append(l)
        
    return (nlu_results)

In [None]:
## Alternative call to handle the case when the NLU response has been broken into chunks of 25 words each
nlu_header=['filename','chunkidx','sentiment_score','sentiment_label','keywords']
nlu_results = getNLUresponseChunks(client, credentials_os['BUCKET'], file_list)

In [None]:
callcenterlogs_nluDF = spark.createDataFrame(nlu_results, nlu_header)

### Sentiment plots using PixieDust
Leverage PixieDust to plot sentiment labels as a pie-chart showing how many positive, negative, and neutral calls are received.

In [None]:
## Ignore any records with null sentiment label
callcenterlogs_nluDF = callcenterlogs_nluDF.where(col('sentiment_label').isNotNull())
perlabel_sentimentDF = callcenterlogs_nluDF.groupBy('sentiment_label')\
                              .agg(F.count('filename')\
                              .alias('num_calls'))

In [None]:
# Call Pixiedust to visualize sentiment data
display(callcenterlogs_nluDF)