In [None]:
#default_exp flowchatbot

# Flow Chatbot library API

> A chatbot library to build bots like [flowchart](https://en.wikipedia.org/wiki/Flowchart)

In [1]:
# export
import redis
import enum
import json
import re

In [66]:
#hide
from nbdev.showdoc import *

## Debug Utils

In [70]:
#export
def enable_debug():
    """
    Enable debug messages
    """
    global DEBUG
    DEBUG = True

def disable_debug():
    """
    Disable debug messages
    """
    global DEBUG
    DEBUG = False
    
def debug_print(*args):
    """
    Debug print args, only printed if debug is enabled
    """
    if ('DEBUG' in globals()) and (DEBUG == 1):
        print(f'DEBUG: {args}')
    else:
        pass

In [71]:
debug_print("test")

In [72]:
enable_debug()
debug_print("test2")

DEBUG: ('test2',)


In [73]:
disable_debug()
debug_print("test3")

## Data Utils

In [77]:
# export
def get_chained_data(data, *keys, default = None):
    """
    Gets data from dict hierarchy with given keys
    if any link is missing, default is set at that key-chain and returned
    :data: data dictionary
    :*keys: heirarchy of all keys towards desired key
    :default: value to set and return if any key is missing
    :returns: value found at key heirarchy in data or default if any key is missing
    """
    d = data
    for key in keys:
        if key not in d:
            break
        d = d[key]
    else:
        return d
    set_chained_data(data, *keys, val = default)
    return default

def set_chained_data(data, *keys, val=0):
    """
    Sets data to dict hierarchy and creating links where necessary
    :data: data dictionary
    :*keys: heirarchy of all keys towards desired key
    :val: value to set
    :returns: nothing
    """
    d = data
    for key in keys[:-1]:
        if key not in d:
            d[key] = {}
        d = d[key]
    d[keys[-1]] = val


In [78]:
# export
def tryint(x):
    """
    Check if input can be parsed as int, return -1 otherwise
    """
    try:
        return int(x)
    except:
        return -1
    

### Test

In [None]:
data = {}
print(get_chained_data(data, 'a', 'b', 'c'))
data

In [None]:
print(get_chained_data(data, 'a', 'b', 'd', default=[]))

In [None]:
lst = get_chained_data(data, 'a', 'b', 'd', default=[])

In [None]:
lst.append(1)

In [None]:
get_chained_data(data, 'a', 'b', 'd', default=[])

In [None]:
get_chained_data(data, 'a', 'b', 'e', default=[])

In [None]:
data

## Chatbot classes

In [9]:
# export
class CHAT_RET(enum.Enum):
    """
    Movement values from chat segment.
    If return stay, bot should reask this question
    If next, bot should move to next in link
    """
    STAY = 0
    NEXT = 1

In [79]:
# export
class Pipe:
    """
    Base class for all flow element classes
    Sets up key, None as next link and callbacks for question and answer call
    """
    def __init__(self, on_question=None, on_answer=None):
        """
        Base class with callbacks on question and answer function
        :on_question: callback function called on question. 
            Signature is fn(data: Dict) -> None
            maybe called more than once if answer is validated
        :on_answer: callback called on answer
            Signature is fn(resp: str, data: dict) -> None
            depending on segment type, it may be called initially, 
                after validation etc.
        """
        self.key = ''
        self.next = None
        self.on_question = on_question
        self.on_answer = on_answer
    
    def __repr__(self):
        return f'{type(self)}: {self.key}'
    
    def question(self, data):
        if self.on_question:
            self.on_question(data)
    
    def answer(self, resp, data):
        if self.on_answer:
            self.on_answer(resp, data)

In [80]:
# export
class Segment(Pipe):
    """
    Basic segment unit, asks one question and gives one answer
    Stores response as it is in 'data' field
    """
    def __init__(self, key, q, a, **kwargs):
        super().__init__(**kwargs)
        self.q = q
        self.a = a
        self.key = key        
        
    def question(self, data):
        """
        Called for getting question from segment, returns question passed during 
        initialization
        """
        Pipe.question(self, data)
        return {'txt': self.q}
    
    def answer(self, resp, data):
        """
        Called for saving response and getting answer
        No validation done on response
        """
        Pipe.answer(self, resp, data)
#         data['data'] = resp
        set_chained_data(data, 'data', val=resp)
        return CHAT_RET.NEXT, {'txt': self.a}
    

In [81]:
# export
class ValidatedSegment(Segment):
    """
    Segment element with data validation facility
    Constructor takes validation function as arg.
    """
    def __init__(self, key, q, a, errmsg, valid_fn, **kwargs):
        """
        valid_fn is called with user response. If it returns true, data 
        is stored and bot moves forward, otherwise bot stays and shows error message
        """
        super().__init__(key, q, a, **kwargs)
        self.err = errmsg
        self.valid_fn = valid_fn
         
    def answer(self, resp, data):
        """
        Called with user response to get answer.
        Validation is done on resp and bot stays on this segment till
        validation functions returns 1
        """
        if self.valid_fn(resp):
            # Pipe.answer(self, resp, data)
            return super().answer(resp, data)
#         data['data'] = ''
        set_chained_data(data, 'data', val='')
        return CHAT_RET.STAY, {'txt': self.err}
  

In [82]:
# export
class MultiChoiceSegment(Segment):
    """
    Element for asking question with multi choice answers.
    Both response number and corresponding answer is stored in data
    """
    def __init__(self, key, q, resp_lst, ans, **kwargs):
        """
        resp_lst is list of choices to be given. Numbered from 1 to n
        """
        super().__init__(key, q, ans, **kwargs)
        self.resp_lst = resp_lst
        
    def question(self, data):
        """
        Returns question text and list of options. Adapter may convert it into
        multiline string or button or any other type
        """
        Pipe.question(self, data)
#         return {'txt': self.q + '\n' + '\n'.join([f'{i+1}. {q}' for (i,q) in enumerate(self.resp_lst)])}
        return {'txt': self.q, 'choices': self.resp_lst}
    
    def answer(self, resp, data):
        """
        Validates response to be an integer between 1..N for N choices
        """
        if 0 < tryint(resp) <= len(self.resp_lst):
            Pipe.answer(self, resp, data)
#             data['data'] = (tryint(resp) - 1, self.resp_lst[tryint(resp) - 1])
            set_chained_data(data, 'data', val = 
                             (tryint(resp) - 1, self.resp_lst[tryint(resp) - 1]))
            return CHAT_RET.NEXT, {'txt': self.a}
        return CHAT_RET.STAY, {'txt': f'Please enter one of 1..{len(self.resp_lst)} as answer'}
    

In [14]:
# export
class ComputeSegment(Segment):
    """
    Segment to do computation. Data of parent elememnt is passed to answer function
    Should be inherited with overriden answer function to do actual computations
    """
    def answer(self, resp, data):
        """
        data of parent is passed
        """
        return super().answer(resp, data)    

In [15]:
# export
class Composite(Pipe):
    """
    Composite element, can have any other element including composite as children.
    All children form linked list and traversed sequentially while this element 
    will return STAY
    When end of linked list is reached, element returns NEXT
    """
    def __init__(self, key, *args):
        """
        args correspond to all children elements
        """
        super().__init__()
        for i in range(len(args) - 1):
            args[i].next = args[i+1]
            
        self.args = args
        self.key = key
        
        self.nodes = {}
        for a in args:
            self.nodes[a.key] = a
    
    def getpos(self, data):
        pos = get_chained_data(data, 'pos', default=self.args[0].key)
#         if pos is None:
#             pos = self.args[0].key
#             set_chained_data(data, 'pos', val=pos)
            
        return self.nodes[pos]
        
    def setpos(self, data, pos):
        set_chained_data(data, 'pos', val=pos.key)
        
    def getdata(self, data, key):
        val = get_chained_data(data, key, default={})
#         if val is None:
#             val = {}
#             set_chained_data(data, key, val=val)
        return val
        
    def question(self, data):
        cur = self.getpos(data)
        debug_print(f'composite question current {cur.key} data {data}')
        curdata = self.getdata(data, cur.key)
            
        if isinstance(cur, Splitter):
            q = cur.question(data)
        else:
            q = cur.question(curdata)
            
        return q
    
    def move(self, cur, cmd, data):
        if cmd == CHAT_RET.NEXT:
            nxt = cur.next
            if nxt == None: # self.args[-1]:
                data.pop('pos')
                return CHAT_RET.NEXT
        else:
            nxt = cur
            
        self.setpos(data, nxt)
        return CHAT_RET.STAY
    
    def answer(self, resp, data):
        cur = self.getpos(data)
        debug_print(f'composite answer current {cur.key} data {data}')
        curdata = self.getdata(data, cur.key)
       
        if isinstance(cur, SkipSplitter):
            if cur.decider_fn(data) == 1:
                mov, ans = cur.answer(resp, data)
                return self.move(cur, mov, data), ans    
            else:
                cur = cur.next
                self.setpos(data, cur)
                curdata = self.getdata(data, cur.key)
                
        if isinstance(cur, Splitter):
            mov, ans = cur.answer(resp, data)
        elif isinstance(cur, ComputeSegment):
            mov, ans = cur.answer(resp, data)
        else:
            mov, ans = cur.answer(resp, curdata)        
        return self.move(cur, mov, data), ans    


In [16]:
#export
class LoopComposite(Composite):
    """
    Composite element with ability to start from begin once end is reached.
    Should have a Skip element before to break loop
    """
    def move(self, cur, cmd, data):
        if cmd == CHAT_RET.NEXT:
            nxt = cur.next
            if nxt == None: # self.args[-1]:
                data.pop('pos')
                return CHAT_RET.STAY
        else:
            nxt = cur
            
        self.setpos(data, nxt)
        return CHAT_RET.STAY

In [17]:
# export
class TextAdapter(Pipe):
    """
    Text based adapter to invoke root element on user reponse.
    A similar adapter is needed for HTML or other bots
    """
    def __init__(self, root):
        super().__init__()
        self.root = root
        self.r = redis.Redis()
    
    def get_data(self, session):
        if self.r.exists(session):
            data = json.loads(self.r.get(session))
        else:
            data = {}
        return data
    
    def respond(self, inp, session):
        data = self.get_data(session)
        n, d1 = self.root.answer(inp, data)
#         if n == CHAT_RET.NEXT:
#             data.pop('pos')
        d2 = self.root.question(data)
        debug_print(f'adapter data {data}')
        self.r.set(session, json.dumps(data))
        d2_str = d2['txt']
        if 'choices' in d2:
            d2_str += '\n' + '\n'.join([f'{i+1}. {q}' for 
                                        (i,q) in enumerate(d2['choices'])])
        return f"{d1['txt']}\n{d2_str}"

## Splitters

In [18]:
# export
class Splitter(Pipe):
    """
    Splitter element to branch flow based on some condition. 
    """
    def __init__(self, key, *branches, decider_fn=lambda _: 0, default = 0):
        """
        branches are all possible branch elements (could be composites)
        decider_fn is passes data of parent and should return branch index 
        (between 0 to n-1)
        default index is used if invalid index is returned by decider_fn
        """
        super().__init__()
        self.key = key
        self.branches = branches
        self.nodes = {}
        for a in branches:
            self.nodes[a.key] = a
            
        self.default = default
        self.decider_fn = decider_fn
        
    def decide(self, data):
        """
        Based on parent level data decide branch and return branch key
        """
        val = self.decider_fn(data)
        debug_print(f'decider {self.key} {val}')
        if val >= len(self.branches):
            val = self.default
        self.branches[val].next = self.next
        return self.branches[val].key
    
    def question(self, data):
        """
        Parent data for splitter
        """
        debug_print(f'splitter question data {data}')
        pos = self.decide(data)
        set_chained_data(data, self.key, 'pos', val = pos)
        debug_print(f'splitter question data after pos set {self.key} {pos} {data}')
        cur = self.nodes[pos]
        curdata = get_chained_data(data, self.key, cur.key, default={})
        return cur.question(curdata)
    
    def answer(self, resp, data):
        debug_print(f'splitter answer data {data}')
        pos = self.decide(data)
        set_chained_data(data, self.key, 'pos', val = pos)
    
        cur = self.nodes[pos]
        
        curdata = get_chained_data(data, self.key, cur.key, default={})
        
        mov, ans = cur.answer(resp, curdata)
        
        if mov == CHAT_RET.NEXT:
            return CHAT_RET.NEXT, ans
            
        return CHAT_RET.STAY, ans    


In [19]:
# export
class SkipSplitter(Splitter):
    """
    A branch is taken on condition otherwise skips the branch 
    (name taken from skipconnection in resnets)
    """
    def __init__(self, key, branch, decider_fn=lambda _: 0, default = 0):
        """
        Single branch should be passed
        if decide_fn returns 1, branch will be taken otherwise skipped
        """
        super().__init__(key, branch, decider_fn=decider_fn, default=default)
    
    def decide(self, data):
        """
        Based on parent level data decide if branch to be taken
        """
        val = self.decider_fn(data)
        return val
    
    def getpos(self, data):
        pos = get_chained_data(data, self.key, 'pos', 
                               default=self.branches[0].key)
        return self.nodes[pos]
        
    def setpos(self, data, pos):
        set_chained_data(data, 'pos', val=pos.key)

    def getdata(self, data, key):
        return get_chained_data(data, self.key, key, default={})
        
    def question(self, data):
        """
        Parent data for splitter
        """
        debug_print(f'skip question data {data}')
        if self.decide(data) == 1:
            # take branch
            cur = self.getpos(data)
            curdata = self.getdata(data, cur.key)
            return cur.question(curdata)
        else:
            # skip branch
            set_chained_data(data, 'pos', val = self.next.key)
#             data['pos'] = self.next.key
            return self.next.question({})
     
    def answer(self, resp, data):
        debug_print(f'skip answer data {data}')
        cur = self.getpos(data)
        curdata = self.getdata(data, cur.key)
            
        mov, ans = cur.answer(resp, curdata)
        
        if mov == CHAT_RET.NEXT:
            return CHAT_RET.NEXT, ans
            
        return CHAT_RET.STAY, ans    


## Tests 

### Test basic segments and composite

In [20]:
bot = TextAdapter(
    Composite(
        'c1',
        Segment('s0', '', 'welcome'),
         Composite('c2', 
            Segment('s1', 'your name', 'got it'),
            ValidatedSegment('s2', 'your phone number', 'got it', 
                            'phone number should be 10 digits with optional +91 at beginning',
                            lambda x: re.match('(\+91)?\d{10}$', x) is not None),        
            Segment('s3', 'your email', 'ok')
                  ),
        Composite('c3', 
            Segment('s4', 'your income', 'ok')
                 )
    )
)
bot.r.flushall()

True

In [21]:
bot.respond('hi', 3)

'welcome\nyour name'

In [22]:
bot.respond('Niraj', 3)

'got it\nyour phone number'

In [23]:
bot.respond('123123', 3)

'phone number should be 10 digits with optional +91 at beginning\nyour phone number'

In [24]:
bot.respond('1231234567', 3)

'got it\nyour email'

In [25]:
bot.respond('asd@qwe', 3)

'ok\nyour income'

In [26]:
bot.respond('30000', 3)

'ok\n'

In [27]:
bot.respond('hi', 3)

'welcome\nyour name'

In [28]:
bot.respond('Niraj', 3)

'got it\nyour phone number'

In [29]:
bot.respond('123123', 3)

'phone number should be 10 digits with optional +91 at beginning\nyour phone number'

In [30]:
bot.respond('1231234567', 3)

'got it\nyour email'

In [31]:
bot.respond('asd@qwe', 3)

'ok\nyour income'

In [32]:
bot.respond('30000', 3)

'ok\n'

In [33]:
bot.get_data(3)

{'s0': {'data': 'hi'},
 'c2': {'s1': {'data': 'Niraj'},
  's2': {'data': '1231234567'},
  's3': {'data': 'asd@qwe'}},
 'c3': {'s4': {'data': '30000'}},
 'pos': 's0'}

### Test multi choice segment

In [34]:
bot = TextAdapter(
    Composite(
        'c1',
        Segment('s0', '', 'welcome'),
        Segment('s1', 'your name', 'got it'),
        MultiChoiceSegment('s2', 'Your income bracket?', 
                          ['0 - 10k', '10k-1L', '1L-10L'], 'got it')
    )
)
bot.r.flushall()

True

In [35]:
print(bot.respond('hi', 0))

welcome
your name


In [36]:
print(bot.respond('test123', 0))

got it
Your income bracket?
1. 0 - 10k
2. 10k-1L
3. 1L-10L


In [37]:
print(bot.respond('3', 0))

got it



In [38]:
bot.get_data(0)

{'s0': {'data': 'hi'},
 's1': {'data': 'test123'},
 's2': {'data': [2, '1L-10L']},
 'pos': 's0'}

### Test splitters

In [39]:
def test_fn(data):
    debug_print('splitter decider', data, data['s4']['data'])
    if tryint(data['s4']['data']) < 10000:
        return 0
    return 1

In [40]:
def infodecider(data):
#     debug_print(f'decider {data}')
    val = get_chained_data(data, 'ss1', 'c2', 's3', 'data')
    if (val is not None) and (val != ''):
        return 0
    return 1

In [41]:
bot = TextAdapter(
    Composite(
        'c1',
        Segment('s0', '', 'welcome'),
        SkipSplitter('ss1', 
                    Composite('c2', 
                        Segment('s1', 'your name', 'got it'),
                        ValidatedSegment('s2', 'your mobile', 'got it', '10 digit mobile',
                                    valid_fn=lambda x: re.match('(\+91)?\d{10}$', x) is not None),
                        ValidatedSegment('s3', 'your email', 'got it', 'email in format abc@qwe.zxc',
                                    valid_fn=lambda x: re.match('[a-zA-Z0-9._]+@[a-zA-Z0-9_]+\.[a-zA-Z0-9._]+', 
                                                                x) is not None)
                    ),
                    decider_fn= infodecider),
        Segment('s4', 'your income', 'ok'),
        Splitter('s5', 
                Segment('s51', 'Do you have bike?', 'ok'),
                Segment('s52', 'Do you have car?', 'ok'),
                decider_fn=test_fn)
    )
)
bot.r.flushall()

True

In [42]:
bot.respond('hi', 3)

'welcome\nyour name'

In [43]:
bot.respond('Niraj', 3)

'got it\nyour mobile'

In [44]:
bot.respond('123123', 3)

'10 digit mobile\nyour mobile'

In [45]:
bot.respond('1231234567', 3)

'got it\nyour email'

In [46]:
bot.respond('asd@qwe', 3)

'email in format abc@qwe.zxc\nyour email'

In [47]:
bot.respond('asd@qwe.com', 3)

'got it\nyour income'

In [48]:
bot.respond('9000', 3)

'ok\nDo you have bike?'

In [49]:
bot.respond('No', 3)

'ok\n'

In [50]:
bot.respond('hi', 3)

'welcome\nyour income'

Personal information part skipped as skip splitter returns zero

In [51]:
bot.respond('19000', 3)

'ok\nDo you have car?'

In [52]:
bot.respond('yes', 3)

'ok\n'

In [53]:
bot.get_data(3)

{'s0': {'data': 'hi'},
 'ss1': {'c2': {'s3': {'data': 'asd@qwe.com'},
   's1': {'data': 'Niraj'},
   's2': {'data': '1231234567'}},
  'pos': 'c2'},
 's4': {'data': '19000'},
 's5': {'pos': 's52', 's51': {'data': 'No'}, 's52': {'data': 'yes'}},
 'pos': 's0'}

### Test callbacks

In [54]:
bot = TextAdapter(Composite('root',
                           Segment('s1', 's1 question', 's1 answer', 
                                   on_question = lambda data: print(f's1 q callback {data}'),
                                   on_answer = lambda resp, data: 
                                       print(f's1 a callback resp: {resp} {data}')
                                  )
                           ))

In [55]:
bot.respond('hi', 0)

s1 a callback resp: hi {}
s1 q callback {'data': 'hi'}


's1 answer\ns1 question'

In [56]:
bot = TextAdapter(Composite('root',
                           ValidatedSegment('s1', 's1 question', 's1 answer', 
                                            'wrong greetings',
                                            lambda resp: resp.lower() in ('hi', 'hello'),
                                   on_question = lambda data: print(f's1 q callback {data}'),
                                   on_answer = lambda resp, data: 
                                       print(f's1 a callback resp: {resp} {data}')
                                  )
                           ))

In [57]:
bot.respond('hullo', 0)

s1 q callback {'data': ''}


'wrong greetings\ns1 question'

In [58]:
bot.respond('Hi', 0)

s1 a callback resp: Hi {'data': ''}
s1 q callback {'data': 'Hi'}


's1 answer\ns1 question'

In [59]:
bot = TextAdapter(Composite('root',
                            Segment('s0', '', 'Welcom'),
                           MultiChoiceSegment('s1', 's1 question', 
                                              ['ans 1', 'ans 2', 'ans 3'],
                                              's1 answer', 
                                   on_question = lambda data: print(f's1 q callback {data}'),
                                   on_answer = lambda resp, data: 
                                       print(f's1 a callback resp: {resp} {data}')
                                  )
                           ))

In [60]:
bot.r.flushall()
bot.respond('hi', 1)

s1 q callback {}


'Welcom\ns1 question\n1. ans 1\n2. ans 2\n3. ans 3'

In [61]:
bot.respond('4', 1)

s1 q callback {}


'Please enter one of 1..3 as answer\ns1 question\n1. ans 1\n2. ans 2\n3. ans 3'

In [62]:
bot.respond('1', 1)

s1 a callback resp: 1 {}


's1 answer\n'

## Build Lib

In [None]:
from nbdev.export import notebook2script 
notebook2script()

In [76]:
#hide
show_doc(enable_debug)

<h4 id="enable_debug" class="doc_header"><code>enable_debug</code><a href="__main__.py#L2" class="source_link" style="float:right">[source]</a></h4>

> <code>enable_debug</code>()

Enable debug messages