In [None]:
#hide
%load_ext autoreload
%autoreload 2

!pip install -r ../requirements.txt > /dev/null

In [None]:
# default_exp dialog_system

In [None]:

# export
from deeppavlov import configs,build_model,train_model
import json
from os import path,popen
import pandas as pd
import numpy as np
from pathlib import Path
import logging

logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG, datefmt='%I:%M:%S')
logging.info("Hello! Welcome to our automated dialog system!")
logging.debug("test: for Debug?")
logging.error(' Error Log Active ')

In [None]:
#export
def run_shell_installs():
    ''' Run install commands
    '''
    command_strings = (
        ' pip install deeppavlov',
        ' python -m deeppavlov install squad',
        ' python -m deeppavlov install squad_bert',
        ' python -m deeppavlov install fasttext_avg_autofaq',
        ' python -m deeppavlov install fasttext_tfidf_autofaq',
        ' python -m deeppavlov install tfidf_autofaq',
        ' python -m deeppavlov install tfidf_logreg_autofaq ',
        ' python -m deeppavlov install tfidf_logreg_en_faq'
    )
    for command in command_strings:
        logging.debug(command)
        logging.debug(popen(command).read())
    

In [None]:
run_shell_installs()

# Dialog System
> Question Answering Automated Dialog System

In [None]:
#export
def action_over_list_f(arr, v):

    k_id, v_id = next(iter(v[0].items()))

    for p, a in enumerate(arr):
        if k_id in a.keys() and a[k_id] == v_id:
            for k_rep, v_rep in v[1].items():
                arr[p][k_rep] = v_rep


def replacement_f(model_config, **args):
    '''Replaces the model config dictionary with new values
    '''
    for k, v in args.items():
        if isinstance(v, dict):
            replacement_f(model_config[k], **v)
        if isinstance(v, str):
            model_config[k] = v
        if isinstance(model_config[k], list):
            action_over_list_f(model_config[k], v)

In [None]:
# test action_over_list_f
from random import randint


def gen_list_keys_for_tests():

    str_n = lambda x: f'{x}_{randint(1,10):1}'
    gen_dict_list = lambda: {
        'id': str_n('id'),
        'key1': str_n('v1'),
        'key2': str_n('v2'),
        'key3': str_n('v3')
    }

    pipe_list = [gen_dict_list() for _ in range(randint(3, 10))]

    rand_id = pipe_list[randint(0, len(pipe_list) - 1)]['id']
    rand_key = f'key{randint(1, 3)}' 

    new_rand_val = str_n('new')
    args = {
        'chains': {
            'pipe': [{
                'id': rand_id
            }, {
               rand_key : new_rand_val
            }]
        }
    }

    return pipe_list, rand_id, rand_key, args, new_rand_val


def test_action_over_list_f():

    pipe_list, rand_id, rand_key, args, new_rand_val = gen_list_keys_for_tests()

    assert all(
        new_rand_val not in pipe_elem.values() for pipe_elem in pipe_list
    )

    action_over_list_f(pipe_list, args['chains']['pipe'])

    assert any(
        rand_key in pipe_elem.keys() and
        new_rand_val in pipe_elem.values() for pipe_elem in pipe_list
    )


def test_replacement_f_list():

    pipe_list, rand_id, rand_key, args, new_rand_val = gen_list_keys_for_tests()

    mod_conf = {'chains': {'pipe': pipe_list}}

    assert all(
        new_rand_val not in pipe_elem.values()
        for pipe_elem in mod_conf['chains']['pipe']
    )

    replacement_f(model_config=mod_conf, **args)
    assert any(
        rand_key in pipe_elem.keys() and
        new_rand_val in pipe_elem.values()
        for pipe_elem in mod_conf['chains']['pipe']
    )


def test_replacement_f_val():
    args = {'key3': 'newvalue'}
    mod_conf = {'key1': 'val1', 'key2': 'val2', 'key3': 'val3'}
    replacement_f(model_config=mod_conf, **args)
    assert all(
        arg_k in mod_conf.keys() and arg_v in mod_conf.values()
        for arg_k, arg_v in args.items()
    )


def test_replacement_f_dict():
    args = {'1_key_3': {'2_key_2': 'newvalue'}}
    mod_conf = {'1_key_3': {'2_key_2': 'oldvalue'}, '0_key_': '0_val'}
    replacement_f(model_config=mod_conf, **args)
    assert mod_conf['1_key_3']['2_key_2'] == 'newvalue'


test_action_over_list_f()
test_replacement_f_list()
test_replacement_f_val()
test_replacement_f_dict()

In [None]:

def updates_faq_config_file(
    configs_path,
    **args
):
    '''Updates deepplavov json config file 
    '''
    #set FAQ data in config file
    model_config = json.load(open(configs_path))

    if 'data_url' in model_config['dataset_reader']:
        del model_config['dataset_reader']['data_url']

    replacement_f(model_config=model_config,**args)

    json.dump(model_config, open(configs_path, 'w'))

In [None]:
#test updates_faq_config_file
import tempfile
from shutil import copyfile


def gen_list_keys_for_tests():

    str_n = lambda x: f'{x}_{randint(1,10):1}'
    gen_dict_list = lambda: {
        'id': str_n('id'),
        'key1': str_n('v1'),
        'key2': str_n('v2'),
        'key3': str_n('v3')
    }

    pipe_list = [gen_dict_list() for _ in range(randint(3, 10))]

    rand_id = pipe_list[randint(0, len(pipe_list) - 1)]['id']
    rand_key =  f'key{randint(1, 3)}' 

    new_rand_val = str_n('new')
    pipe_dict = {'pipe': [{'id': rand_id}, {rand_key: new_rand_val}]}
    args = {'chainer': pipe_dict}

    return pipe_list, rand_id, rand_key, args, new_rand_val


def test_updates_faq_config_file_update_string():

    with tempfile.TemporaryDirectory() as tmpdirname:

        tmp_config_file = path.join(tmpdirname, 'tmp_file.json')

        copyfile(configs.faq.tfidf_logreg_en_faq, tmp_config_file)

        assert path.isfile(tmp_config_file)

        updates_faq_config_file(
            configs_path=tmp_config_file,
            dataset_reader={'data_path': 'fictional_csv_file.csv'}
        )

        config_json = json.load(open(tmp_config_file))
        assert 'data_path' in config_json['dataset_reader']


def test_updates_faq_config_file_update_list():

    with tempfile.TemporaryDirectory() as tmpdirname:

        tmp_config_file = path.join(tmpdirname, 'tmp_file.json')

        pipe_list, rand_id, rand_key, args, new_rand_val = gen_list_keys_for_tests(
        )
        mod_conf = {
            'chainer': {
                'pipe': pipe_list
            },
            'dataset_reader': 'dataset_reader_dictionary'
        }

        json.dump(mod_conf, open(tmp_config_file, 'w'))

        assert path.isfile(tmp_config_file)

        updates_faq_config_file(configs_path=tmp_config_file, **args)

        config_json = json.load(open(tmp_config_file))
   
        assert any(
            rand_key in pipe_elem.keys() and new_rand_val in pipe_elem.values()
            for pipe_elem in config_json['chainer']['pipe']
        )


test_updates_faq_config_file_update_string()
test_updates_faq_config_file_update_list()

In [None]:
#export
def select_faq_responses(faq_model, question):
    '''Calls Deeppavlov FAQ model
    '''
    return faq_model([question])[0]

In [None]:
#test faq responses
import tempfile
from shutil import copyfile


def gen_mock_csv_file(tmpdirname, faqs):

    temp_faq_csv = path.join(tmpdirname, 'tmp_faq.csv')

    pd.DataFrame(faqs).to_csv(temp_faq_csv, index=False)

    return temp_faq_csv


def gen_mock_vocab_answers(tmpdirname, vocabs):

    temp_dict_file = path.join(tmpdirname, 'temp_vocab_answers.dict')
    vocabs_text = '\n'.join(
        t + '\t' + str(f) for t, f in zip(vocabs['text'], vocabs['freq'])
    )

    f = open(temp_dict_file, 'w')
    f.write(vocabs_text)
    f.close()

    return temp_dict_file


def gen_faq_config(tmpdirname, vocab_file, faq_file):

    temp_configs_faq = path.join(tmpdirname, 'temp_config_faq.json')
    copyfile(configs.faq.tfidf_logreg_en_faq, temp_configs_faq)

    changes_dict = {'save_path': vocab_file, 'load_path': vocab_file}
    id_dict = {'id': 'answers_vocab'}

    updates_faq_config_file(
        configs_path=temp_configs_faq,
        chainer={'pipe': [id_dict, changes_dict]},
        dataset_reader={'data_path': faq_file}
    )

    return temp_configs_faq


def test_faq_response_with_minimum_faqs_in_dataframe_fail_case():
    with tempfile.TemporaryDirectory() as tmpdirname:

        faqs = {
            'Question': ['Is Covid erradicated?'],
            'Answer': ['Definitely not!']
        }

        vocabs = {'text': ['This is a vocab example'], 'freq': [1]}

        faq_file = gen_mock_csv_file(tmpdirname, faqs)
        vocab_file = gen_mock_vocab_answers(tmpdirname, vocabs)

        configs_file = gen_faq_config(tmpdirname, vocab_file, faq_file)

        try:
            select_faq_responses(
                question='Is Enrique the prettiest person in town?',
                faq_model=train_model(configs_file, download=True)
            )
            assert False
        except ValueError as e:
            assert True


def test_faq_response_with_minimum_faqs_in_dataframe_success_case():
    with tempfile.TemporaryDirectory() as tmpdirname:

        faqs = {
            'Question': ['Is Covid erradicated?', 'Who is the current POTUS?'],
            'Answer': ['Definitely not!', 'Donald Trump']
        }

        vocabs = {'text': ['This is a vocab example'], 'freq': [1]}

        faq_file = gen_mock_csv_file(tmpdirname, faqs)
        vocab_file = gen_mock_vocab_answers(tmpdirname, vocabs)

        configs_file = gen_faq_config(tmpdirname, vocab_file, faq_file)

        assert select_faq_responses(
            question='Is Enrique the prettiest person in town?',
            faq_model=train_model(configs_file, download=True)
        ) == ['Donald Trump']

        
        
def test_faq_response_with_minimum_answers_vocab_success_case():
    with tempfile.TemporaryDirectory() as tmpdirname:

        faqs = {
            'Question': ['Is Covid erradicated?', 'Who is the current POTUS?'],
            'Answer': ['Definitely not!', 'Donald Trump']
        }

        vocabs = {'text': [], 'freq': []}

        faq_file = gen_mock_csv_file(tmpdirname, faqs)
        vocab_file = gen_mock_vocab_answers(tmpdirname, vocabs)

        configs_file = gen_faq_config(tmpdirname, vocab_file, faq_file)

        select_faq_responses(
            question='Is Enrique the prettiest person in town?',
            faq_model=train_model(configs_file, download=True)
        ) == ['Donald Trump']

test_faq_response_with_minimum_faqs_in_dataframe_fail_case()
test_faq_response_with_minimum_faqs_in_dataframe_success_case()
test_faq_response_with_minimum_answers_vocab_success_case()

In [None]:
def select_squad_responses(
    contexts, squad_model, question, best_results=1
):
    '''Calls Deeppavlov BERT and RNET Context Question Answering
    '''
    responses = contexts.context.apply(
        lambda context: squad_model([context], [question])
    ).values
    
    logging.debug(f'Responses: {responses}')
    top_responses = [
        r[0][0] for r in sorted(responses, key=lambda x: -1 * x[2][0])
        [:best_results]
    ]

    logging.debug(f'Top Responses: {top_responses}')
    return responses, top_responses

In [None]:
#test select_squad_responses
import tempfile
from shutil import copyfile

empty = {'topic': [], 'context': []}
spacex = {
    'topic': ['SpaceX'],
    'context':
        [
            '''Space Exploration Technologies Corp., trading as SpaceX, is an American aerospace manufacturer and space transportation
services company headquartered in Hawthorne, California. It was founded in 2002 by Elon Musk with the goal of reducing space 
transportation costs to enable the colonization of Mars. SpaceX has developed several launch vehicles, the Starlink satellite
constellation, and the Dragon spacecraft. It is widely considered among the most successful private spaceflight companies.'''
        ]
}

intekglobal = {
    'topic': ['Intekglobal', 'InG'],
    'context':
        [
            'Intekglobal has its headquarters located in TJ',
            'Intekglobal is in the north of mexico'
        ]
}


def assert_squad_model(
    contexts, squad_model, question, expected_responses, **args
):
    responses, top_responses = select_squad_responses(
        contexts=pd.DataFrame(contexts),
        squad_model=squad_model,
        question=question,
        **args
    )
    assert top_responses == expected_responses


def test_squad_bert():

    bert = build_model(configs.squad.squad_bert, download=True)

    assert_squad_model(
        empty,
        bert,
        'Is an empty response expected?',
        expected_responses=[],
        best_results=2
    )

    assert_squad_model(
        spacex, bert, 'Who founded SpaceX?', expected_responses=['Elon Musk']
    )

    assert_squad_model(
        intekglobal,
        bert,
        'Where is Intekglobal located?',
        expected_responses=['north of mexico','TJ'],
        best_results=2
    )


def test_squad_rnet():

    bert = build_model(configs.squad.squad, download=True)

    assert_squad_model(
        empty,
        bert,
        'Is an empty response expected?',
        expected_responses=[],
        best_results=5
    )

    assert_squad_model(
        spacex, bert, 'Who founded SpaceX?', expected_responses=['Elon Musk']
    )

    assert_squad_model(
        intekglobal,
        bert,
        'Where is Intekglobal located?',
        expected_responses=['north of mexico','TJ'],
        best_results=2
    )

test_squad_bert()
test_squad_rnet()
del spacex, empty, intekglobal

In [None]:
#export
def load_qa_models(
    config_rnet=configs.squad.squad,
    config_bert=configs.squad.squad_bert,
    config_tfidf=configs.faq.tfidf_logreg_en_faq,
    download=True
):
    qa_models = {
        'squad':
            {
                'rnet': build_model(config_rnet, download=download),
                'bert': build_model(config_bert, download=download)
            },
        'faq': {
            'tfidf': train_model(config_tfidf, download=download)
        }
    }
    return qa_models


def format_responses(question, responses):
    formatted_response = f'{question}:\n\n'
    for k, res in enumerate(responses):
        formatted_response += f'{k}: {res}\n'
    return formatted_response


def get_responses(contexts, question, qa_models, nb_squad_results=1):
    responses = []
    for squad_model in qa_models['squad'].values():
        responses.extend(
            select_squad_responses(
                contexts, squad_model, question, best_results=nb_squad_results
            )[1]
        )
    for faq_model in qa_models['faq'].values():
        responses.extend(select_faq_responses(faq_model, question))
    return responses, format_responses(
        question, set([r for r in responses if r.strip()])
    )

In [None]:
# test get_responses
import tempfile
from shutil import copyfile

intekglobal_context = {
    'topic': ['Intekglobal', 'InG'],
    'context':
        [
            'Intekglobal has its headquarters located in TJ',
            'Intekglobal is in the north of mexico'
        ]
}

intekglobal_faqs = {
    'Question': ['Is Intekglobal an IT company?', 'Where can I apply?'],
    'Answer':
        ['Yes it is!', 'Please refer the our website for further information']
}


def mock_faq_files(tmpdirname, faqs):

    faq_files = {
        'data': path.join(tmpdirname, 'temp_faq.csv'),
        'config': path.join(tmpdirname, 'temp_config_faq.json')
    }

    pd.DataFrame(faqs).to_csv(faq_files['data'], index=False)
    copyfile(configs.faq.tfidf_logreg_en_faq, faq_files['config'])

    updates_faq_config_file(
        configs_path=faq_files['config'],
        dataset_reader={'data_path': faq_files['data']}
    )

    return faq_files


def test_get_intekglobal_responses():
    with tempfile.TemporaryDirectory() as tmpdirname:

        faq_files = mock_faq_files(tmpdirname, intekglobal_faqs)
        qa_models = load_qa_models(
            config_tfidf=faq_files['config'], download=False
        )

        responses, format_responses = get_responses(
            pd.DataFrame(intekglobal_context),
            'Where is Intekglobal?',
            qa_models,
            nb_squad_results=2
        )

        logging.debug(responses)
        logging.debug(format_responses)
        assert all(
            response in ('north of mexico', 'TJ', 'Yes it is!')
            for response in responses
        )
        assert ''' Where is Intekglobal?:

0: north of mexico
1: TJ
2: Yes it is!
        '''.strip() == format_responses.strip()


def test_get_empty_responses():
    with tempfile.TemporaryDirectory() as tmpdirname:
        empty_faqs = {'Question': [], 'Answer': []}
        faq_files = mock_faq_files(tmpdirname, empty_faqs)

        qa_models = load_qa_models(
            config_tfidf=faq_files['config'], download=False
        )
        empty_context = {'topic': [], 'context': []}

        responses, format_responses = get_responses(
            pd.DataFrame(empty_context),
            'Where is Intekglobal?',
            qa_models,
            nb_squad_results=2
        )

        logging.debug(responses)
        logging.debug(format_responses)
        #assert all(
        #    response in ('north of mexico', 'TJ', 'Yes it is!')
        #    for response in responses
        #)


test_get_intekglobal_responses()
test_get_empty_responses()

del intekglobal_context

In [None]:
#export
def get_input(text):
    '''This redundancy is needed for testing'''
    return input(text)


def new_answer(question, data, qa_models):

    if get_input('Give a better anwser [y/n]?')[0].lower() != 'y':
        return 'no data updates..'

    if get_input('Give the answer as a context [y/n]?')[0].lower() == 'y':
        new_context = pd.DataFrame(
            {
                'topic': [get_input('Give context a title:\n')],
                'context': [get_input('Introduce the context:\n')]
            }
        )
        data['context']['df'] = data['context']['df'].append(new_context)
        data['context']['df'].to_csv(data['context']['path'])

        return 'contexts dataset updated..'
    else:
        new_faq = pd.DataFrame(
            {
                'Question': [question],
                'Answer': [get_input('Introduce the answer:\n')]
            }
        )
        data['faq']['df'] = data['faq']['df'].append(new_faq)
        data['faq']['df'].to_csv(data['faq']['path'])
        qa_models['faq']['tfidf'] = train_model(
            configs.faq.tfidf_logreg_en_faq, download=False
        )
        return 'FAQ dataset and model updated..'


def question_response(data, qa_models, num_returned_values_per_squad_model=1):
    question = get_input('Introduce question:\n')


    _, formatted_responses = get_responses(
            pd.DataFrame(intekglobal_context), 'Where is Intekglobal?',
            qa_models,
            nb_squad_results=2
        )

    return formatted_responses, new_answer(question, data, qa_models)

In [None]:
##Test FAQ dialog system

In [None]:
import tempfile
from unittest.mock import patch
from shutil import copyfile
example_contexts = pd.DataFrame(
    {
        'context':
            [
                'Intekglobal has its headquarters located in TJ',
                'In Intekglobal we care about you',
                'Enrique Jimenez is one of the smartest minds on the planet'
            ]
    },
)

data = {
    'context': {
        'df': example_contexts,
        'path': ...
    },
    'faq': {
        'df': ...,
        'path': ...
    }
}

FAQ_DATA_FILE = path.join(
    popen('dirname $PWD').read().strip(), 'data/faq_data.csv'
)

def copy_data_files(data, tmpdirname):
    data['context']['path'] = path.join(tmpdirname, 'tmp_context.csv')
    data['faq']['path'] = path.join(tmpdirname, 'tmp_faq.csv')
    data['faq']['df'] = pd.read_csv(FAQ_DATA_FILE)
    data['context']['df'].to_csv(data['context']['path'])
    copyfile(FAQ_DATA_FILE, data['faq']['path'])


@patch('__main__.get_input')
def test_context_response_with_no_updates(mock_input):
    mock_input.side_effect = ['Who is Enrique Jimenez?', 'N']

    with tempfile.TemporaryDirectory() as tmpdirname:
        copy_data_files(data, tmpdirname)
        responses, status = question_response(data, qa_models)
        assert 'no data updates..' == status
        assert 'one of the smartest minds on the planet'  in responses


@patch('__main__.train_model')
@patch('__main__.get_input')
def test_updating_faq_dataset(mock_input, mock_train_model):

    new_answer = 'Intekglobal is one of the best companies in the world'
    mock_input.side_effect = ['What is Intekglobal?', 'Y', 'N', new_answer]
    qa_model_faq = qa_models['faq']['tfidf']
    try:

        with tempfile.TemporaryDirectory() as tmpdirname:
            copy_data_files(data, tmpdirname)

            assert 'FAQ dataset and model updated..' == question_response(
                data, qa_models
            )[1]

            updated_faq = pd.read_csv(data['faq']['path'])

            assert updated_faq[updated_faq['Answer'] == new_answer
                              ].shape[0] == 1
    except Exception as e:
        print(e)
    finally:
        qa_models['faq']['tfidf'] = qa_model_faq


test_context_response_with_no_updates()
test_updating_faq_dataset()

NameError: name 'qa_models' is not defined

In [None]:
import tempfile
from unittest.mock import patch
from shutil import copyfile, copytree
from pprint import pprint


HOME_DIR = popen('echo $HOME').read().strip()

example_contexts = pd.DataFrame(
    {
        'context':
            [
                'Intekglobal has its headquarters located in TJ',
                'In Intekglobal we care about you'
            ]
    },
)

data = {
    'context': {
        'df': example_contexts,
        'path': ...
    },
    'faq': {
        'df': ...,
        'path': ...
    }
}


FAQ_DATA_FILE = path.join(
    popen('dirname $PWD').read().strip(), 'data/faq_data.csv'
)

CONTEXT_DATA_FILE = path.join(
    popen('dirname $PWD').read().strip(), 'data/context_data.csv'
)


def copy_data_files(data, tmpdirname):
    data['context']['path'] = path.join(tmpdirname, 'tmp_context.csv')
    data['faq']['path'] = path.join(tmpdirname, 'tmp_faq.csv')
    data['faq']['df'] = pd.read_csv(FAQ_DATA_FILE)
    copyfile(CONTEXT_DATA_FILE, data['context']['path'])
    copyfile(FAQ_DATA_FILE, data['faq']['path'])


def modify_configs(data, tmpdirname):

    copy_data_files(data, tmpdirname)

    tmp_configs_faq = path.join(tmpdirname, 'temp_config_faq.json')
    tmp_model_dir = path.join(tmpdirname, 'temp_models_dir')

    metadata = json.load(open(configs.faq.tfidf_logreg_en_faq)
                        )['metadata']['variables']

    models_dir = metadata['MODELS_PATH'].replace(
        '{ROOT_PATH}', metadata['ROOT_PATH'].replace('~', HOME_DIR)
    )

    copytree(models_dir, tmp_model_dir)
    copyfile(configs.faq.tfidf_logreg_en_faq, tmp_configs_faq)

    configs.faq.tfidf_logreg_en_faq = tmp_configs_faq

    updates_faq_config_file(
        metadata={'variables': {
            'MODELS_PATH': tmp_model_dir
        }},
        dataset_reader={'data_path': data['faq']['path']}
    )
    #pprint(json.load(open(configs.faq.tfidf_logreg_en_faq)))
    assert path.isdir(tmp_model_dir)
    assert path.isfile(configs.faq.tfidf_logreg_en_faq)


@patch('__main__.get_input')
def test_faq_answer_with_updating(mock_input):

    new_answer = 'Intekglobal is one of the best companies in the world'
    question = 'What is Intekglobal?'
    mock_input.side_effect = [question, 'Y', 'N', new_answer, question, 'N']

    original_config_file = configs.faq.tfidf_logreg_en_faq
    qa_model_faq = qa_models['faq']['tfidf']

    with tempfile.TemporaryDirectory() as tmpdirname:

        try:
            modify_configs(data, tmpdirname)
    
            old_responses = question_response(data, qa_models)[0]
            new_responses = question_response(data, qa_models)[0]
           
            logging.info(f'Old response:\n {old_responses}')
            logging.info(f'New response:\n{new_responses}')
        except Exception as e:
            print(e)

        finally:
            configs.faq.tfidf_logreg_en_faq = original_config_file
            qa_models['faq']['tfidf'] = qa_model_faq

        assert new_answer not in old_responses
        assert new_answer in new_responses


test_faq_answer_with_updating()

In [None]:
##Test Context Dialog System
import tempfile
from unittest.mock import patch
from shutil import copyfile, copytree
from pprint import pprint

example_contexts = pd.DataFrame(
    {
        'topic': ['Headquarters', 'Mision'],
        'context':
            [
                'Intekglobal has its headquarters located in TJ',
                'In Intekglobal we care about you'
            ]
    }
)

data = {
    'context': {
        'df': example_contexts,
        'path': ...
    },
    'faq': {
        'df': ...,
        'path': ...
    }
}


FAQ_DATA_FILE = path.join(
    popen('dirname $PWD').read().strip(), 'data/faq_data.csv'
)

CONTEXT_DATA_FILE = path.join(
    popen('dirname $PWD').read().strip(), 'data/context_data.csv'
)


def copy_data_files(data, tmpdirname):
    data['context']['path'] = path.join(tmpdirname, 'tmp_context.csv')
    data['faq']['path'] = path.join(tmpdirname, 'tmp_faq.csv')
    data['faq']['df'] = pd.read_csv(FAQ_DATA_FILE)
    copyfile(CONTEXT_DATA_FILE, data['context']['path'])
    copyfile(FAQ_DATA_FILE, data['faq']['path'])


@patch('__main__.get_input')
def test_context_new_answer(mock_input):

    question = 'What is a Chatbot?'
    new_topic = 'AI Tool & Chatbot Development'
    new_context = '''

A chatbot is an important tool for simulating intelligent conversations with humans.
Intekglobal chatbots efficiently live message on platforms such as Facebook Messenger, 
Slack, and Telegram. Assisting consumers with a variety of purposes and industries. 

But chatbots are more than just a cool technology advancement. They actually transform the user experience.
People want simple and convenient interactions with interface and products.

'''

    mock_input.side_effect = [
        question, 'YES', 'yes', new_topic, new_context, question, 'N'
    ]

    with tempfile.TemporaryDirectory() as tmpdirname:

        try:

            copy_data_files(data, tmpdirname)

            old_responses = question_response(data, qa_models)[0]
            logging.info(f'Old response:\n {old_responses}')
            new_responses = question_response(data, qa_models)[0]
            logging.info(f'New response:\n{new_responses}')
        except Exception as e:
            print(repr(e))

        finally:
            logging.info(' Test finished')

        updated_context = pd.read_csv(data['context']['path'])

        assert updated_context[updated_context['context'] == new_context
                              ].shape[0] == 1

        assert updated_context[updated_context['topic'] == new_topic
                              ].shape[0] == 1
        assert 'an important tool for simulating intelligent conversations with humans' not in old_responses
        assert 'an important tool for simulating intelligent conversations with humans' in new_responses


test_context_new_answer()

In [None]:
#export
def dialog_system(context_data_file=None, faq_data_file=None):
    '''
     Main Dialog System
    '''

    PARENT_DIR = popen('dirname $PWD').read().strip()
    if context_data_file is None:
        context_data_file = path.join(PARENT_DIR, 'data/context_data.csv')
    if faq_data_file is None:
        faq_data_file = path.join(PARENT_DIR, 'data/faq_data.csv')
    
    run_shell_installs()
    updates_faq_config_file(dataset_reader={'data_path': faq_data_file}) 
    qa_models = load_qa_models()

    context = {'df': pd.read_csv(CONTEXT_DATA_FILE), 'path': CONTEXT_DATA_FILE}
    faq = {'df': pd.read_csv(FAQ_DATA_FILE), 'path': FAQ_DATA_FILE}

    data = {'context': context, 'faq': faq}

    while True:
        try:
            question_response(data=data, qa_models=qa_models)
        except (KeyboardInterrupt, EOFError, SystemExit):
            logging.debug('Goodbye!')
            return 'Goodbye!'

In [None]:
# test  dialog_system()

from unittest.mock import patch

@patch('__main__.run_shell_installs')
@patch('__main__.load_qa_models')
@patch('__main__.pd.read_csv')
@patch('__main__.question_response')
def test_main_keyboard_interrupt(
    mock_question_response,
    mock_pd_read_csv,
    mock_load_qa_models,
    mock_run_shell_installs,
):
    mock_question_response.side_effect = [
        KeyboardInterrupt(), EOFError(),
        SystemExit()
    ]
    assert 'Goodbye!' == dialog_system()
    assert 'Goodbye!' == dialog_system()
    assert 'Goodbye!' == dialog_system()


test_main_keyboard_interrupt()