In [27]:
def run_python_code(py_code, g=globals()):
    """
    专门用于执行python代码，并获取最终查询或处理结果。
    :param py_code: 字符串形式的Python代码，用于执行对telco_db数据库中各张数据表进行操作
    :param g: g，字符串形式变量，表示环境变量，无需设置，保持默认参数即可
    :return：代码运行的最终结果
    """    
    
    global_vars_before = set(g.keys())
    try:
        exec(py_code, g)            
    except Exception as e:
        return f"代码执行时报错{e}"
    global_vars_after = set(g.keys())
    new_vars = global_vars_after - global_vars_before
    # 若存在新变量
    if new_vars:
        result = {var: g[var] for var in new_vars}
        return str(result)
    # 若不存在新变量
    else:
        try:
            # 尝试如果是表达式，则返回表达式运行结果
            result = eval(py_code, g)
            print("代码运行结果:", result)  # 打印表达式结果
            return str(result)
        except Exception as e:
            # 若不是表达式，尝试再次执行代码
            try:
                exec(py_code, g)
                return "已经顺利执行代码"
            except Exception as e:
                return f"代码执行时报错{e}"

In [28]:
function_info_run_python_code = {
    'name': 'run_python_code',
    'description': 'run python code',
    'parameters': {
        'type': 'object',
        'properties': {
            'py_code': {
                'type': 'string',
                'description': 'The python code function writen in string'
            },
            'g': {
                'type': 'string',
                'description': 'The default scope of execution, please ignore it'
            }
        },
        'required': ['py_code']
    }
}

In [29]:
import json, ast
import openai
from dotenv import load_dotenv  
import os

# 加载.env文件  
load_dotenv("en1106.env")  

os.environ["OPENAI_API_TYPE"] = os.environ["Azure_OPENAI_API_TYPE1"]
os.environ["OPENAI_API_BASE"] = os.environ["Azure_OPENAI_API_BASE1"]
os.environ["OPENAI_API_KEY"] =  os.environ["Azure_OPENAI_API_KEY1"]
os.environ["OPENAI_API_VERSION"] = os.environ["Azure_OPENAI_API_VERSION1"]
BASE_URL=os.environ["OPENAI_API_BASE"]
API_KEY=os.environ["OPENAI_API_KEY"]

CHAT_DEPLOYMENT_NAME=os.environ.get('AZURE_OPENAI_API_CHAT_DEPLOYMENT_NAME')
EMBEDDING_DEPLOYMENT_NAME=os.environ.get('AZURE_OPENAI_API_EMBEDDING_DEPLOYMENT_NAME')

openai.api_type = os.environ["OPENAI_API_TYPE"]
openai.api_base = os.environ["OPENAI_API_BASE"]
openai.api_version = "2023-07-01-preview"
openai.api_key = os.getenv("OPENAI_API_KEY")

In [30]:
import json
import io
import inspect
import requests

from IPython.display import display, Code, Markdown
import tiktoken
def check_code_run(messages, 
                   functions_list = None, 
                   functions = None,
                   model = "gpt-35-turbo-1106", 
                   function_call = "auto", 
                   auto_run = True):
    
    """
    能够自动执行外部函数调用的Chat对话模型，专门用于代码解释器的构建过程，可以通过auto_run参数设置，决定是否自动执行代码
    :param messages: 必要参数，字典类型，输入到Chat模型的messages参数对象
    :param functions_list: 可选参数，默认为None，可以设置为包含全部外部函数的列表对象
    :param functions: 可选参数，默认为None，可以设置为包含全部外部函数参数解释Schema格式列表
    :param model: Chat模型，可选参数，默认模型为gpt-3.5
    :auto_run：在调用外部函数的情况下，是否自动进行Second Response。该参数只在外部函数存在时起作用
    :return：Chat模型输出结果
    """
    
    # 如果没有外部函数库，则执行普通的对话任务
    if functions_list == None:
        response = openai.ChatCompletion.create(
                        model=model,
                        engine=model,
                        messages=messages,
                        )
        response_message = response["choices"][0]["message"]
        final_response = response_message["content"]
        
    # 若存在外部函数库，则需要灵活选取外部函数并进行回答
    else:
        
        # 创建外部函数库字典
        available_functions = {func.__name__: func for func in functions_list}

        # first response
        response = openai.ChatCompletion.create(
                        model=model,
                        engine=model,
                        messages=messages,
                        tools=functions, 
                        tool_choice=function_call)
        response_message = response["choices"][0]["message"]

        # 判断返回结果是否存在function_call，即判断是否需要调用外部函数来回答问题
        # 若存在function_call，则执行Function calling流程
        # 需要调用外部函数，由于考虑到可能存在多次Function calling情况，这里创建While循环
        # While循环停止条件：response_message不包含function_call
        while response_message.get("tool_calls"):
            print("正在调用外部函数...")
            try:
                tool_calls=response_message["tool_calls"]
                resultMessages=[]

                results={}
                g=globals() 
                for tool_call in tool_calls:
                    # 获取函数名
                    function_name = tool_call["function"]["name"]
                    # 获取函数参数
                    function_args = json.loads(tool_call["function"]["arguments"])
                    # 将当前操作空间中的全局变量添加到外部函数中
                    function_args['g']=g.copy()

                    def convert_to_markdown(code, language):
                        return f"```{language}\n{code}\n```"

                    if function_args.get('py_code'):
                        code = function_args['py_code']
                        markdown_code = convert_to_markdown(code, 'python')
                        print("即将执行以下代码：")

                    else:
                        markdown_code = function_args

                    display(Markdown(markdown_code))

                    if auto_run == False:
                        res = input('请确认是否运行上述代码（1），或者退出本次运行过程（2）' )

                        if res == '2':
                            print("终止运行")
                            return None

                    print("正在执行代码，请稍后...")


                    # 获取函数对象
                    fuction_to_call = available_functions[function_name]
                    # 将函数参数输入到函数中，获取函数计算结果
                    function_response = fuction_to_call(**function_args)
                    results[tool_call.id]=function_response
                    print("外部函数已运行完毕")
                    print("外部函数结果：")
                    print(function_response)

                    resultMessages.append(
                        {
                                "tool_call_id": tool_call.id,
                                "role": "tool",
                                "name": tool_call.function.name,
                                "content": str(results[tool_call.id]) if results[tool_call.id] else '',
                            }
                    )  
            
                # messages中拼接first response消息
                response_message['content']=' '
                messages.append(response_message)  
                # messages中拼接函数输出结果
                messages=messages+resultMessages
                
                # 第二次调用模型
                second_response = openai.ChatCompletion.create(
                    model=model,
                    engine=model,
                    messages=messages,
                    tools=functions, 
                    tool_choice=function_call)

                # 更新response_message
                response_message = second_response["choices"][0]["message"]
        
            except Exception as e:
                print("json格式对象创建失败，正在重新运行")
                final_response = check_code_run(messages = messages, 
                                                functions_list = functions_list, 
                                                functions = functions,
                                                model = model, 
                                                function_call = function_call, 
                                                auto_run = auto_run)
                
        # While条件不满足，或执行完While循环之后，提取返回结果
        final_response = response_message

        display(Markdown(final_response["content"]))

        messages.append(response_message)
    
    return messages

In [32]:
messages=[{'role': 'user', 'content': '给出斐波那契前20位，  并解方程： 4x+3y=120  x-2y=12'}]
system_message = {"role":"system","content":'''You're python developer.And always create python function to resolve question"
    '''}
response_message = check_code_run(messages=[system_message]+messages[-10:], 
                                  functions_list = [run_python_code], 
                                  functions = [{
                                    "type": "function",
                                    "function": function_info_run_python_code
                                       }])

正在调用外部函数...
即将执行以下代码：


```python
# Calculate the first 20 numbers in the Fibonacci sequence
fibonacci_sequence = [0, 1]
for i in range(2, 20):
    next_number = fibonacci_sequence[-1] + fibonacci_sequence[-2]
    fibonacci_sequence.append(next_number)
fibonacci_sequence
```

正在执行代码，请稍后...
外部函数已运行完毕
外部函数结果：
{'i': 19, 'next_number': 4181, 'fibonacci_sequence': [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181]}
正在调用外部函数...
即将执行以下代码：


```python
from sympy import symbols, Eq, solve

# Define the symbols
x, y = symbols('x y')

# Define the equations
eq1 = Eq(4*x + 3*y, 120)
eq2 = Eq(x - 2*y, 12)

# Solve the equations
solution = solve((eq1, eq2), (x, y))
solution
```

正在执行代码，请稍后...
外部函数已运行完毕
外部函数结果：
{'solution': {x: 276/11, y: 72/11}}
即将执行以下代码：


```python
from sympy import Eq, solve, symbols
x, y = symbols('x y')
eq1 = Eq(4*x + 3*y, 120)
eq2 = Eq(x - 2*y, 12)
solution=solve((eq1, eq2), (x, y))  
solution
```

正在执行代码，请稍后...
外部函数已运行完毕
外部函数结果：
{'solution': {x: 276/11, y: 72/11}}
即将执行以下代码：


```python
from sympy import Eq, solve, symbols
x, y = symbols('x y')
eq1 = Eq(4*x + 3*y, 120)
eq2 = Eq(x - 2*y, 12)
solution=solve((eq1, eq2))
solution
```

正在执行代码，请稍后...
外部函数已运行完毕
外部函数结果：
{'solution': {x: 276/11, y: 72/11}}


The first 20 numbers in the Fibonacci sequence are:
\[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181\]

The solutions to the equations are:
\[x = \frac{276}{11},\, y = \frac{72}{11}\]