In [None]:
import random
import time


In [None]:
CODE_TAG_OPEN = "<code>"
CODE_TAG_CLOSE = "</code>"
OBSERVATION_TAG_OPEN = "<observation>"
OBSERVATION_TAG_CLOSE = "</observation>"

TAGS = [CODE_TAG_OPEN, CODE_TAG_CLOSE, OBSERVATION_TAG_OPEN, OBSERVATION_TAG_CLOSE]
MAX_TAG_LENGTH = max(len(tag) for tag in TAGS)

MAX_TAG_LENGTH

## Create random text

In [None]:
def random_text(length=100):
    """ Create a random text and newlines. """
    text = ""
    for _ in range(length):
        text += random.choice("abcdefghijklmnopqrstuvwxyz<>+-/*=0123456789 \n")
    return text

In [None]:
def random_text_with_tags(min_length=10000):
    """ Create a random text that has multiple TAG_OPEN and TAG_CLOSE, OBSERVATION_TAG_OPEN and OBSERVATION_TAG_CLOSE and newlines. """
    text = random_text()
    while len(text) < min_length:
        item = random.choice([CODE_TAG_OPEN, OBSERVATION_TAG_OPEN, "text"])
        text_length = random.randint(10, 100)
        if item == "text":
            text += random_text(text_length)
        elif item == CODE_TAG_OPEN:
            text += "\n" + CODE_TAG_OPEN + random_text(text_length) + "\n" + CODE_TAG_CLOSE
        elif item == OBSERVATION_TAG_OPEN:
            text += "\n" + OBSERVATION_TAG_OPEN + random_text(text_length) + "\n" + OBSERVATION_TAG_CLOSE
    return text

In [None]:
text = random_text_with_tags()
print(text)

### Stream random chunks of text

In [None]:
def stream_text_random_chunks(text, max_chunk_size=5):
    i = 0
    n = len(text)
    while i < n:
        chunk_size = random.randint(1, max_chunk_size)
        yield text[i:i+chunk_size]
        i += chunk_size

In [None]:
class MessageType:
    TEXT = "text"
    CODE = "code"
    OBSERVATION = "observation"

In [None]:
prev_text, msg_type = "", MessageType.TEXT

for chunk in stream_text_random_chunks(text):
    # Sleep for a short time to simulate streaming
    #time.sleep(0.05)
    curr_text = prev_text + chunk
    # Detect any "<" character
    idx = curr_text.find("<")
    if idx < 0:
        # If no "<" character is found, continue
        print(curr_text, end="")
        prev_text = ""
    else:
        # If "<" character is found, print the previous text and reset it
        if idx > 0:
            # Print the text before the "<" character
            print(curr_text[:idx], end="")
            curr_text = curr_text[idx:]
        end_idx = curr_text.find(">")
        if end_idx > 0:
            # We know that there is a tag in curr_text, let´s extract it
            tag = curr_text[:end_idx + 1]
            curr_text = curr_text[end_idx + 1:]
            # Switch the message type based on the tag
            if tag == CODE_TAG_OPEN:
                msg_type = MessageType.CODE
                print(f"TAG: {tag} - Message Type: {msg_type}\n")
            elif tag == OBSERVATION_TAG_OPEN:
                msg_type = MessageType.OBSERVATION
                print(f"TAG: {tag} - Message Type: {msg_type}\n")
            elif tag == CODE_TAG_CLOSE or tag == OBSERVATION_TAG_CLOSE:
                msg_type = MessageType.TEXT
                print(f"TAG: {tag} - Message Type: {msg_type}\n")
            else:
                # If the tag is not recognized, we assume it is not a message type change, so we go ahead
                print(tag, end="")
        elif len(curr_text) > MAX_TAG_LENGTH:
            # If the current text is longer than the maximum tag length, we assume it is a text
            # We drop the starting "<" character and continue
            print(curr_text[0], end="")
            curr_text = curr_text[1:]
        prev_text = curr_text
