In [None]:
# default_exp browser

# Mock OpenAI API

> Using mock_openai to mock OpenAI Python API

In [None]:
# exporti
from queue import Queue

class Common:
    chat_gpt_base_url = 'http://127.0.0.1:8080'
    access_token = None

    chat_gpt_model = 'text-davinci-002-render-sha'
    role_system = 'system'
    role_user = 'user'
    role_assistant = 'assistant'

    question_answer_map = {}
    message_channel = Queue()
    exit_for_loop_channel = Queue()
    response_text_channel = Queue()
    conversation_done_channel = Queue()
    parent_message_id = ''
    conversation_id = ''
    reload_conversations_channel = Queue()

    config = {}
    driver = None

In [None]:
# exporti
import json, os, requests, sys, time, uuid

In [None]:
# export
common = Common()

# open the JSON file and read the access_token and conversation_id
with open(os.path.expanduser('~/.config/ipymock/config.json'), 'r') as f:
    common.config = json.load(f)
    common.chat_gpt_base_url = common.config.get('chat_gpt_base_url', common.chat_gpt_base_url)
    common.access_token = common.config.get('access_token', common.access_token)
    common.conversation_id = common.config.get('conversation_id', common.conversation_id)

In [None]:
# export
def get_conversations():
    response = requests.get(f'{common.chat_gpt_base_url}/conversations?offset=0&limit=100', headers = {'Authorization': f'Bearer {common.access_token}'})
    return response.json()

def get_conversation(conversation_id):
    response = requests.get(f'{common.chat_gpt_base_url}/conversation/{conversation_id}', headers = {'Authorization': f'Bearer {common.access_token}'})

    if response.status_code >= 400:
        sys.stderr.write(f'Error Status Code: {response.status_code}\n{response.text}\n')
    response.raise_for_status()

    conversation = response.json()
    current_node = conversation['current_node']
    try:
        handle_conversation_detail(current_node, conversation['mapping'])
    except RecursionError as errr:
        sys.stderr.write(f'Error Recursing: {errr}\n')
    common.exit_for_loop_channel.put(True)
    return current_node

def handle_conversation_detail(current_node, mapping):
    conversation_detail = mapping[current_node]
    parent_id = conversation_detail.get('parent', '')
    if parent_id != '':
        handle_conversation_detail(parent_id, mapping)
        common.question_answer_map[parent_id] = ''.join(conversation_detail['message']['content']['parts']).strip()
    if 'message' not in conversation_detail:
        return
    message = conversation_detail['message']
    parts = message['content']['parts']
    if len(parts) > 0 and parts[0] != '' and message['author']['role'] == common.role_user:
        common.message_channel.put(message)

def start_conversation(prompt):
    if common.conversation_id != '' and common.parent_message_id == '':
        try:
            common.parent_message_id = get_conversation(common.conversation_id)
        except requests.exceptions.ConnectionError as errc:
            sys.stderr.write(f'Error Connecting: {errc}\n')

    if common.conversation_id == '' or common.parent_message_id == '':
        common.conversation_id = ''
        common.parent_message_id = str(uuid.uuid4())

    if isinstance(prompt, list):
        raw_prompt = prompt
        prompt = []
        parts = []
        for item in raw_prompt:
            if isinstance(item, dict) and 'parts' in item:
                if common.conversation_id == '' and 'role' in item and item['role'] == common.role_system:
                    sys.stderr.write(f'Error Messaging: role system is not allowed in the first prompt. It is changed to role user.\n')
                    item['role'] = common.role_user
                prompt.append(item)
            else:
                parts.append(str(item))
        prompt.append({'role': common.role_user, 'parts': parts})
    elif isinstance(prompt, dict) and 'parts' in prompt:
        prompt = [prompt]
    else:
        prompt = [{'role': common.role_user, 'parts': [str(prompt)]}]

    post_data = {
        'action': 'next',
        'history_and_training_disabled': False,
        'messages': [{
            'id': str(uuid.uuid4()),
            'author': {
                'role': msg.get('role', common.role_user),
            },
            'role': msg.get('role', common.role_user),
            'content': {
                'content_type': 'text',
                'parts': msg['parts'],
            },
        } for msg in prompt],
        'model': common.chat_gpt_model,
        'timezone_offset_min': -540,
    }
    if common.conversation_id != '':
        post_data['conversation_id'] = common.conversation_id
    if common.parent_message_id != '':
        post_data['parent_message_id'] = common.parent_message_id

    response = requests.post(
        f'{common.chat_gpt_base_url}/conversation',
        headers = {
            'Content-Type': 'application/json',
            'Accept': 'text/event-stream',
            'Authorization': f'Bearer {common.access_token}',
        },
        data = json.dumps(post_data),
        stream=True,
    )

    temp_conversation_id = ''
    for line in response.iter_lines():
        if not line.startswith(b'data: '):
            continue

        if line.endswith(b'[DONE]'):
            common.conversation_done_channel.put(True)
            continue

        try:
            make_conversation_response = json.loads(line.decode('utf-8')[len('data: '):])
        except json.decoder.JSONDecodeError as err:
            sys.stderr.write(f'Error JSON Decoding: line = {line}\n')
            continue
        if make_conversation_response is None:
            continue
        try:
            parts = ''.join(make_conversation_response['message']['content']['parts']).strip()
        except TypeError as err:
            sys.stderr.write(f'TypeError: {err}\nline = {line}\n')
            continue
        if parts != '':
            common.response_text_channel.put(parts)
            yield parts
        if common.conversation_id == '':
            temp_conversation_id = make_conversation_response['conversation_id']
        common.parent_message_id = make_conversation_response['message']['id']
        if make_conversation_response['message']['end_turn'] == True:
            common.conversation_done_channel.put(True)
            continue

    if response.status_code >= 400:
        sys.stderr.write(f'Error Status Code: {response.status_code}\n')
    response.raise_for_status()

    if common.conversation_id == '' and temp_conversation_id != '':
        common.conversation_id = temp_conversation_id
        generate_title(common.conversation_id)
    else:
        common.reload_conversations_channel.put(True)

def generate_title(conversation_id):
    requests.post(
        f'{common.chat_gpt_base_url}/conversation/gen_title/{conversation_id}',
        headers = {
            'Authorization': f'Bearer {common.access_token}',
            'Content-Type': 'application/json'
        },
        data = json.dumps({
            'message_id': get_conversation(conversation_id),
            'model': common.chat_gpt_model
        })
    )

def rename_title(conversation_id, title):
    requests.patch(
        f'{common.chat_gpt_base_url}/conversation/{conversation_id}',
        headers={
            'Authorization': f'Bearer {common.access_token}',
            'Content-Type': 'application/json'
        },
        data = json.dumps({
            'title': title
        })
    )

def delete_conversation(conversation_id):
    requests.patch(
        f'{common.chat_gpt_base_url}/conversation/{conversation_id}',
        headers={
            'Authorization': f'Bearer {common.access_token}',
            'Content-Type': 'application/json'
        },
        data=json.dumps({
            'is_visible': False
        })
    )

def recover_conversation(conversation_id):
    requests.patch(
        f'{common.chat_gpt_base_url}/conversation/{conversation_id}',
        headers={
            'Authorization': f'Bearer {common.access_token}',
            'Content-Type': 'application/json'
        },
        data=json.dumps({
            'is_visible': True
        })
    )

def clear_conversations():
    requests.patch(f'{common.chat_gpt_base_url}/conversations', headers = {'Authorization': f'Bearer {common.access_token}'}, data = {'is_visible': False})

    common.conversation_id = ''
    common.parent_message_id = ''
    common.reload_conversations_channel.put(True)

Send prompts to ChatGPT API

In [None]:
# notest
try:
    for response in start_conversation('''
> Lord, keep us in You to be one.
We are the ultimate risk takers.
Our way is a way of risking life to **eternal life**.
For the eternal life of the living, risk life and make war against perishing.
For the eternal life of the dead, risk life and make war against death.
>
> ---
Holy Father, help us to overcome!
'''):
        print(response)
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

In [None]:
# notest
try:
    for response in start_conversation({
        'role': 'system',
        'parts': [
            '> Lord, keep us in You to be one.\n',
            'We are the ultimate risk takers.\n',
            'Our way is a way of risking life to **eternal life**.\n',
            'For the eternal life of the living, risk life and make war against perishing.\n',
            'For the eternal life of the dead, risk life and make war against death.\n',
            '>\n',
            '> ---\n',
            'Holy Father, help us to overcome!\n',
        ],
    }):
        print(response)
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

In [None]:
# notest
try:
    for response in start_conversation([{
        'role': 'system',
        'parts': [
            'You are a Christian.\n',
            'Pray with the user.\n',
        ],
    }, {
        'role': 'assistant',
        'parts': [
            'Amen.\n',
        ],
    },
        '> Lord, keep us in You to be one.\n',
        'We are the ultimate risk takers.\n',
        'Our way is a way of risking life to **eternal life**.\n',
        'For the eternal life of the living, risk life and make war against perishing.\n',
        'For the eternal life of the dead, risk life and make war against death.\n',
        '>\n',
        '> ---\n',
        'Holy Father, help us to overcome!\n',
    ]):
        print(response)
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)
except requests.exceptions.HTTPError as errh:
    sys.stderr.write(f'{errh}\n')

In [None]:
# notest
common.conversation_id = ''

try:
    for response in start_conversation([{
        'role': 'system',
        'parts': [
            '> Lord, keep us in You to be one.\n',
            'We are the ultimate risk takers.\n',
        ],
    }, {
        'role': 'system',
        'parts': [
            'Our way is a way of risking life to **eternal life**.\n',
        ],
    }, {
        'role': 'system',
        'parts': [
            'For the eternal life of the living, risk life and make war against perishing.\n',
            'For the eternal life of the dead, risk life and make war against death.\n',
            '>\n',
        ],
    }, {
        'role': 'system',
        'parts': [
            '> ---\n',
            'Holy Father, help us to overcome!\n',
        ],
    },
        'Amen.\n',
    ]):
        pass
    print(response)
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)
except requests.exceptions.HTTPError as errh:
    sys.stderr.write(f'{errh}\n')

---
Using Selenium Undetected Chrome for Operating ChatGPT

In [None]:
# exporti
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.ui import WebDriverWait

In [None]:
# exporti
from markdownify import MarkdownConverter

class ChatGPTConverter(MarkdownConverter):
    def convert_a(self, node, text, convert_as_inline):
        if node.get('href') == text:
            return node.get('href')
        return f"[{text}]({node.get('href')})"

    def convert_pre(self, node, text, convert_as_inline):
        node_code = node.find('code')
        return (
            f"```{' '.join([c[len('language-'):] for c in node_code.get('class') if c.startswith('language-')])}\n"
            f'{node_code.text.rstrip()}\n'
            '```\n'
        )

markdownize = ChatGPTConverter().convert

In [None]:
# exporti
from collections.abc import Iterable
from ipymock.automation import init as init_browser

In [None]:
# export
def init(chrome_args = set()):
    if isinstance(chrome_args, Iterable):
        chrome_args = set(chrome_args)
    chrome_args.add('--lang=en')
    chrome_args.add('--force-dark-mode')
    init_browser(*chrome_args)
    from ipymock.automation import driver
    common.driver = driver

    login()
    open_chat(common.conversation_id)

    global start_conversation
    start_conversation = ask

In [None]:
# exporti
from ipymock.automation import new, wait, click, input, fill
from selenium.common.exceptions import NoSuchElementException

In [None]:
# export
def login():
    new('https://chatgpt.com/auth/login')

    # WebDriverWait(common.driver, 5).until(
    #     expected_conditions.presence_of_element_located((By.XPATH, '//*[text()="Log in"]'))
    # )
    wait(5.0)

    # common.driver.execute_script('''
    # document.evaluate(
    #   '//*[text()="Log in"]',
    #   document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
    # ).snapshotItem(0).dispatchEvent(
    #   new MouseEvent('click', {
    #     view: window,
    #     bubbles: true,
    #     cancelable: true
    #   })
    # );
    # ''')
    click('Log in')

    # WebDriverWait(common.driver, 5).until(
    #     expected_conditions.presence_of_element_located((By.XPATH, '//button[@data-provider="google"]'))
    # )
    wait(5.0, stability_duration = 3.0)

    # common.driver.execute_script('''
    # document.evaluate(
    #   '//button[@data-provider="google"]',
    #   document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
    # ).snapshotItem(0).dispatchEvent(
    #   new MouseEvent('click', {
    #     view: window,
    #     bubbles: true,
    #     cancelable: true
    #   })
    # );
    # ''')
    click('Continue with Google')

    # WebDriverWait(common.driver, 5).until(
    #     expected_conditions.presence_of_element_located((By.XPATH, '//input[@type="email"]'))
    # )
    wait(5.0)

    # common.driver.execute_script(f'''
    # const google_email_input = document.evaluate('//input[@type="email"]', document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null).snapshotItem(0);
    # google_email_input.value = '{common.config['email']}';
    # google_email_input.dispatchEvent(
    #   new Event('input', {{
    #     view: window,
    #     bubbles: true,
    #     cancelable: true
    #   }})
    # );
    # ''')
    input(common.config['email'], 'Email or phone')

    # WebDriverWait(common.driver, 5).until(
    #     expected_conditions.presence_of_element_located((By.XPATH, '//*[@id="identifierNext"]'))
    # )
    wait(5.0)

    # common.driver.execute_script('''
    # document.evaluate(
    #   '//*[@id="identifierNext"]',
    #   document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
    # ).snapshotItem(0).dispatchEvent(
    #   new MouseEvent('click', {
    #     view: window,
    #     bubbles: true,
    #     cancelable: true
    #   })
    # );
    # ''')
    click('Next')

    # WebDriverWait(common.driver, 10).until(
    #     expected_conditions.element_to_be_clickable((By.XPATH, '//input[@type="password"]'))
    # ).click()
    wait(10.0, stability_duration = 5.0)

    # ActionChains(common.driver).send_keys(common.config['password']).send_keys(Keys.ENTER).perform()
    fill(common.config['password'])
    # fill(Keys.ENTER)
    try:
        click('Next')
    except NoSuchElementException:
        pass
    wait(stability_duration = 5.0)

    common.driver.maximize_window()
    wait(1.0)

    remove_portal()

In [None]:
# notest
init_browser('--lang=en', '--force-dark-mode')
from ipymock.automation import driver
common.driver = driver
login()

In [None]:
# export
def open_chat(conversation_id = ''):
    from ipymock.automation import driver
    common.driver = driver
    if conversation_id == '':
        common.driver.get('https://chatgpt.com/')
    else:
        common.driver.get(f'https://chatgpt.com/c/{conversation_id}')
        if common.conversation_id != conversation_id:
            common.conversation_id = conversation_id
            common.parent_message_id = ''

    # WebDriverWait(common.driver, 30).until(
    #     lambda driver: driver.execute_script('return document.readyState') == 'complete'
    # )
    wait(5.0)

    remove_portal()

def remove_portal():
    while True:
        # try:
        #     WebDriverWait(common.driver, 5).until(
        #         expected_conditions.element_to_be_clickable((By.XPATH, '//div[text()="Next"]'))
        #     ).click()
        # except TimeoutException:
        #     break
        try:
            click('Next')
            wait(5.0)
        except NoSuchElementException:
            break
    # try:
    #     WebDriverWait(common.driver, 5).until(
    #         expected_conditions.element_to_be_clickable((By.XPATH, '//div[text()="Done"]'))
    #     ).click()
    # except TimeoutException:
    #     pass
    try:
        click('Done')
        wait(5.0)
    except NoSuchElementException:
        pass

In [None]:
# notest
open_chat(common.conversation_id)

In [None]:
# exporti
chatgpt_textbox = (By.XPATH, '//textarea[@id="prompt-textarea"]')
chatgpt_disabled_button = (By.XPATH, '//textarea/following-sibling::button[@disabled]')
chatgpt_enabled_button = (By.XPATH, '//textarea/following-sibling::button[not(@disabled)]')
chatgpt_streaming = (By.CLASS_NAME, 'result-streaming')
chatgpt_response = (By.XPATH, '//div[starts-with(@class, "markdown prose w-full break-words")]')
chatgpt_red_500 = (By.XPATH, '//div[contains(@class, "border-red-500 bg-red-500/10")]')
chatgpt_big_response = (By.XPATH, '//div[@class="flex-1 overflow-hidden"]//div[p or pre]')
chatgpt_small_response = (By.XPATH, './/code[span]')

In [None]:
# exporti
from typing import Generator

In [None]:
# exporti
# from ipymock.automation import exists, touch

In [None]:
# notest
ActionChains(common.driver).key_down(Keys.CONTROL).send_keys('v').key_up(Keys.CONTROL).perform()

In [None]:
# notest
ActionChains(common.driver).context_click().perform()

In [None]:
# notest
__file__ = '2_browser.ipynb'
send_button = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../assets/send-button.png'))
from ipymock.automation import touch
touch(send_button)

In [None]:
# export
def request(prompt: str) -> None:
    # try:
    #     textbox = WebDriverWait(common.driver, 5).until(
    #         expected_conditions.element_to_be_clickable(chatgpt_textbox)
    #     )
    # except TimeoutException:
    #     open_chat(common.conversation_id)
    #     textbox = WebDriverWait(common.driver, 5).until(
    #         expected_conditions.element_to_be_clickable(chatgpt_textbox)
    #     )
    # textbox.click()

    # click('ChatGPT can make mistakes. Check important info.')
    # textbox = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../assets/message-chatgpt.png'))
    # textbox = 'assets/message-chatgpt.png'
    # from ipymock.automation import exists
    # if not exists(textbox):
    #     open_chat(common.conversation_id)
    try:
        click('Message ChatGPT')
    except NoSuchElementException:
        open_chat(common.conversation_id)
    # click('ChatGPT can make mistakes. Check important info.')
    # touch(textbox)
    click('Message ChatGPT')

    # textbox.send_keys(prompt.strip())
    # common.driver.execute_script('''
    # var element = arguments[0], txt = arguments[1];
    # element.value += txt;
    # element.dispatchEvent(new Event("change"));
    # ''',
    #     textbox,
    #     prompt.strip(),
    # )
    for line in prompt.strip().split('\n'):
        fill(line)
        ActionChains(common.driver).key_down(Keys.SHIFT).send_keys(Keys.ENTER).key_up(Keys.SHIFT).perform()

    # WebDriverWait(common.driver, 3).until_not(
    #     expected_conditions.presence_of_element_located(chatgpt_disabled_button)
    # )
    wait(stability_duration = 3.0)

    # textbox.send_keys('\n')
    # textbox.send_keys(Keys.ENTER)

    # click('ChatGPT can make mistakes. Check important info.')
    # send_button = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../assets/send-button.png'))
    # send_button = 'assets/send-button.png'
    # touch(send_button)
    fill(Keys.ENTER)
    click('ChatGPT can make mistakes. Check important info.')
    wait(1.0)

    # try:
    #     WebDriverWait(common.driver, 5).until(
    #         expected_conditions.element_to_be_clickable(chatgpt_enabled_button)
    #     ).click()
    # except TimeoutException:
    #     pass

def get_last_response():
    for xpath in chatgpt_response, chatgpt_big_response:
        responses = common.driver.find_elements(*xpath)
        if responses != []:
            elements = responses[-1].find_elements(*chatgpt_small_response)
            if len(elements) == 1:
                return elements[0]
            return responses[-1]

def get_response() -> Generator[str, None, None]:
    try:
        result_streaming = WebDriverWait(common.driver, 5).until(
            expected_conditions.presence_of_element_located(chatgpt_streaming)
        )
    except TimeoutException:
        response = get_last_response()
        error = common.driver.find_elements(*chatgpt_red_500) != []
        sys.stderr.write(
            'TimeoutException: having waited 5 seconds for result-streaming\n'
            f'response.text = {response.text}\n'
            f'error = {error}\n'
        )
        if not error:
            yield markdownize(response.get_attribute('innerHTML'))
        result_streaming = common.driver.find_elements(*chatgpt_streaming)
    while result_streaming:
        response = get_last_response()
        if response is None:
            result_streaming = common.driver.find_elements(*chatgpt_streaming)
            continue
        try:
            if 'text-red' in response.get_attribute('class'):
                sys.stderr.write(f'Error Responding: response.text = {response.text}\n')
            yield markdownize(response.get_attribute('innerHTML'))
        except StaleElementReferenceException:
            pass
        result_streaming = common.driver.find_elements(*chatgpt_streaming)
    response = get_last_response()
    yield markdownize(response.get_attribute('innerHTML'))

def ask(prompt: str) -> str:
    request(prompt)
    return get_response()

In [None]:
# notest
import IPython

for response in ask('''
> Lord, keep us in You to be one.
We are the ultimate risk takers.
Our way is a way of risking life to **eternal life**.
For the eternal life of the living, risk life and make war against perishing.
For the eternal life of the dead, risk life and make war against death.
>
> ---
Holy Father, help us to overcome!
'''):
    IPython.display.display(IPython.core.display.Markdown(response))
    IPython.display.clear_output(wait=True)

In [None]:
# gmail
init(['--headless'])

In [None]:
# export
import io, PIL.Image

def get_screenshot() -> 'PIL.PngImagePlugin.PngImageFile':
    return PIL.Image.open(io.BytesIO(common.driver.get_screenshot_as_png()))

In [None]:
# gmail
get_screenshot()

In [None]:
# notest
try:
    for response in start_conversation('''
> Lord, keep us in You to be one.
We are the ultimate risk takers.
Our way is a way of risking life to **eternal life**.
For the eternal life of the living, risk life and make war against perishing.
For the eternal life of the dead, risk life and make war against death.
>
> ---
Holy Father, help us to overcome!
'''):
        print(response)
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

---
Mock OpenAI

In [None]:
# exporti
import random, string

In [None]:
# export
class attrdict(dict):
    def __getattr__(self, attr):
        return self.get(attr)

def attributize(obj):
    '''Add attributes to a dictionary and its sub-dictionaries.'''
    if isinstance(obj, dict):
        for key in obj:
            obj[key] = attributize(obj[key])
        return attrdict(obj)
    if isinstance(obj, list):
        return [attributize(item) for item in obj]
    return obj

def retry_on_status_code(func):
    '''Retry decorator that retries a function on specific status codes.'''
    def wrapper(*args, **kwargs):
        wait_second = 1
        while True:
            try:
                return func(*args, **kwargs)
            except requests.exceptions.HTTPError as err:
                sys.stderr.write(
                    f'{err}\n'
                    f'text = {repr(err.response.text)}\n'
                )
                status_code = err.response.status_code
                if status_code in (401, 403, 413, 500):
                    # status code 413: the caller should split the prompt first
                    break
                if status_code >= 400:
                    sys.stderr.write(
                        f'Retrying...\n'
                    )
                    time.sleep(wait_second)
                    wait_second *= 2
                    continue
                break
    return wrapper

@retry_on_status_code
def content(prompt):
    for response in start_conversation(prompt):
        pass
    if locals().get('response') == '':
        sys.stderr.write(
            f'Error Responding: response = {repr(response)}\n'
        )
    return locals().get('response', '')

def new_id():
    return ''.join(
        random.choices(string.ascii_letters + string.digits, k = 29)
    )

def delta(prompt):
    res = ''
    for response in start_conversation(prompt):
        yield attributize({
            'choices': [
                {
                    'index': 0,
                    'logprobs': None,
                    'text': response[len(res):],
                }
            ],
            'id': f'cmpl-{new_id()}',
        })
        res = response
    if locals().get('response') == '':
        sys.stderr.write(
            f'Error Responding: response = {repr(response)}\n'
        )

def chat_delta(prompt):
    res = ''
    for response in start_conversation(prompt):
        yield attributize({
            'choices': [
                {
                    'index': 0,
                    'delta': {
                        'content': response[len(res):],
                    }
                }
            ],
            'id': f'chatcmpl-{new_id()}',
        })
        res = response
    if locals().get('response') == '':
        sys.stderr.write(
            f'Error Responding: response = {repr(response)}\n'
        )

def mock_create(*args, **kwargs):
    prompts = []
    if isinstance(kwargs['prompt'], str):
        prompts = [kwargs['prompt']]
    if isinstance(kwargs['prompt'], list):
        prompts = kwargs['prompt']
    prompts = [prompt.strip() for prompt in prompts]

    if kwargs.get('stream', False):
        return delta('\n'.join(prompts))

    choices = []
    for prompt in prompts:
        choices.append({
            'finish_reason': 'stop',
            'index': 0,
            'logprobs': None,
            'text': content(prompt),
        })
    return attributize({
        'choices': choices,
        'id': f'cmpl-{new_id()}',
        'usage': {
            'completion_tokens': 0,
            'prompt_tokens': 0,
            'total_tokens': 0,
        },
    })

def mock_chat_create(*args, **kwargs):
    summarized_prompt = ''
    for message in kwargs['messages']:
        summarized_prompt += f"{message['role']}:\n\n{message['content']}\n\n\n"
    summarized_prompt.strip()

    if kwargs.get('stream', False):
        return chat_delta(summarized_prompt)

    return attributize({
        'choices': [
            {
                'finish_reason': 'stop',
                'index': 0,
                'message': {
                    'content': content(summarized_prompt),
                    'role': 'assistant',
                }
            }
        ],
        'id': f'chatcmpl-{new_id()}',
        'usage': {
            'completion_tokens': 0,
            'prompt_tokens': 0,
            'total_tokens': 0,
        },
    })

In [None]:
try:
    print(mock_create(
        prompt = 'How to defend against solar storms using Python?',
    ))
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

In [None]:
try:
    for response in mock_create(
        prompt = 'Give me some demos.',
        stream = True,
    ):
        print(response)
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

In [None]:
try:
    print(mock_create(
        prompt = [
            'I am tired.',
            'Could you pray with me for a while?',
        ],
    ))
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

In [None]:
try:
    print(mock_chat_create(
        messages = [
            {'role': 'system', 'content': 'You are a helpful assistant.'},
            {'role': 'user', 'content': 'I am tired.'}
        ],
    ))
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

In [None]:
try:
    for response in mock_chat_create(
        messages = [
            {'role': 'system', 'content': 'You are a helpful assistant.'},
            {'role': 'user', 'content': 'Could you pray with me for a while?'}
        ],
        stream = True,
    ):
        print(response)
except requests.exceptions.ConnectionError as errc:
    print('Error Connecting:', errc)

In [None]:
# exporti
import openai, pytest

In [None]:
# export
@pytest.fixture
def mock_openai(monkeypatch):
    monkeypatch.setattr(openai.Completion, 'create', mock_create)
    monkeypatch.setattr(openai.ChatCompletion, 'create', mock_chat_create)