In [1]:
import os
import os.path
from os import path

from dotenv import load_dotenv, find_dotenv
import json
import csv

import requests
import time

from ibm_watson import AssistantV2
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator

from snips_nlu import SnipsNLUEngine
from snips_nlu.default_configs import CONFIG_EN

from numpy import exp
import collections

In [2]:
# constants
VERSION = 8
NUM_SPLITS = 10

# lists
NLU_NAMES = ['watson', 'luis', 'snips']

load_dotenv(find_dotenv())

True

## Train/ load NLU model

### Snips.ai

In [3]:
def read_json_file(filename):
    with open(filename) as f:
        data = json.load(f)
    return data

In [4]:
def train_snips(train_file):
    train_dataset = read_json_file(train_file)
    nlu_engine = SnipsNLUEngine(config=CONFIG_EN)
    nlu_engine.fit(train_dataset)
    return nlu_engine

### Watson Assistant

In [5]:
# documentation: https://cloud.ibm.com/apidocs/assistant/assistant-v2?code=python
authenticator = IAMAuthenticator(os.getenv("IBM_ASSISTANT_API_KEY"))
assistant = AssistantV2(
    version='2022-02-02',
    authenticator = authenticator
)
assistant.set_service_url(os.getenv("IBM_WATSON_REGION"))

In [6]:
def get_assistant_id(split_num):
    split_num = str(split_num)
    return 'IBM_SPLIT_' + split_num + '_ASSISTANT_ID'

### LUIS

In [7]:
def get_app_id(split_num):
    split_num = str(split_num)
    return 'LUIS_SPLIT_' + split_num + '_APP_ID'

## Test NLU (Watson, LUIS and Snips.ai)

In [17]:
def get_params(nlu_name, split_num, version=VERSION):
    # paths
    train_file = '../datasets/' + nlu_name + '/v' + str(version) + '/' + nlu_name +'_split_' + str(split_num) + '_train_v' + str(version) + '.json' # snips only
    test_file = '../datasets/' + nlu_name + '/v' + str(version) + '/' + nlu_name + '_split_' + str(split_num) + '_test_v' + str(version) + '.csv'
    output_file = '../results/' + nlu_name + '/v' + str(version) + '/' + nlu_name + '_split_' + str(split_num) + '_results_v' + str(version) + '.json'
    return train_file, test_file, output_file

In [18]:
def unify_keys(json_res, nlu='snips'):
    for rank in json_res['intent_ranking']:
        if nlu=='snips':
            rank['name'] = rank.pop('intentName')
            rank['confidence'] = rank.pop('probability')
        elif nlu=='watson':
            rank['name'] = rank.pop('intent')
        else:
            pass
    return json_res

In [19]:
def add_is_correct(json_res):
    if json_res['intent_ranking'][0]['name'] == intent:
        json_res['is_correct'] = True
    else:
        json_res['is_correct'] = False
    return json_res   

In [20]:
def get_nlu_response(utterance, assistant=None, nlu_engine=None, nlu='snips', split_num=1):
    if nlu=='watson':
        ibm_assistant_id = get_assistant_id(split_num)
        response = assistant.message_stateless(
            assistant_id=os.getenv(ibm_assistant_id),
                input={
                    'message_type': 'text',
                    'text': utterance,
                    'options' : {'alternate_intents': True}
                }
            ).get_result()
    elif nlu=='snips':
        response = nlu_engine.get_intents(utterance)
    elif nlu=='luis':
        luis_app_id = get_app_id(split_num)
        appId = os.getenv(luis_app_id)
        prediction_key = os.getenv("LUIS_PREDICTION_SUBSCRIPTION_KEY")
        prediction_endpoint = os.getenv("LUIS_PREDICTION_ENDPOINT")
        # The URL parameters to use in this REST call.
        headers = {}
        params ={
            'query': utterance,
            'timezoneOffset': '0',
            'verbose': 'true',
            'show-all-intents': 'true',
            'spellCheck': 'false',
            'staging': 'false',
            'subscription-key': prediction_key
        }

        # Make the REST call.
        response = requests.get(f'{prediction_endpoint}luis/prediction/v3.0/apps/{appId}/slots/production/predict', headers=headers, params=params)
        response = response.json()
    else:
        pass
    
    return response

In [21]:
def sleep_if_luis(i, nlu='luis'): 
    """
    This function is used to keep a rate of 5TC per second when running LUIS
    """
    if i % 5 == 4 and nlu=='luis': # not 0 because indecies start at 0
        time.sleep(1.5) 

In [22]:
def read_csv(file_path):
    file = open(file_path)
    reader = csv.reader(file, delimiter=',')
    return reader

In [23]:
def write_results(output_file, results, nlu_name, version):
    if not path.exists('../results/' + nlu_name):
                os.mkdir('../results/' + nlu_name)
    if not path.exists('../results/' + nlu_name + '/v' + str(version)):
                os.mkdir('../results/' + nlu_name + '/v' + str(version))
    with open(output_file, 'w') as f:
                json.dump(results, f, indent=2)

In [24]:
for nlu_name in NLU_NAMES:
    for split in range(NUM_SPLITS):
        split_num = split + 1
        results = []
        train_file, test_file, output_file = get_params(nlu_name, split_num, version=VERSION)
        reader = read_csv(test_file)
        
        if nlu_name=='snips':
            nlu_engine = train_snips(train_file)
            
        for i, row in enumerate(reader):
            intent = row[1]
            utterance = row[0]

            if i==0 or utterance=='': # skip header and empty utterances
                continue 
            sleep_if_luis(i, nlu=nlu_name)

            # pass a user utterance to the NLU and get response
            if nlu_name == 'watson':
                response = get_nlu_response(utterance, assistant=assistant, nlu=nlu_name, split_num=split_num)
                intent_ranking = response['output']['intents']
                
            elif nlu_name == 'snips':
                intent_ranking = get_nlu_response(utterance, nlu_engine=nlu_engine, nlu=nlu_name, split_num=split_num)
                intent_ranking = [r for r in intent_ranking if r['intentName'] != None]
                
            elif nlu_name == 'luis':
                response = get_nlu_response(utterance, nlu=nlu_name, split_num=split_num)
                try:
                    intent_ranking = response['prediction']['intents']
                    intent_ranking = [{'name' : n, 'confidence': s['score']} for n,s in intent_ranking.items()]
                    intent_ranking = [r for r in intent_ranking if r['name'] != None and r['name'] != 'None']
                except:
                    print(response)
                    print("Trying to recover ...")
                    time.sleep(1)
                    response = get_nlu_response(utterance, nlu=nlu_name, split_num=split_num)
                    try:
                        intent_ranking = response['prediction']['intents']
                        intent_ranking = [{'name' : n, 'confidence': s['score']} for n,s in intent_ranking.items()]
                    except:
                        print("Failed to recover. Adding an empty list to intent_ranking")
                        intent_ranking = []

                        
            # resturctre results
            json_res = { 'text': utterance,
                         'correct_intent' : intent,
                         'intent_ranking' : intent_ranking
                        }

            json_res = unify_keys(json_res, nlu=nlu_name)
            json_res = add_is_correct(json_res)

            results.append(json_res)
           
        write_results(output_file, results, nlu_name, VERSION)        