In [None]:
!pip install ipywidgets

In [1]:
import requests
from tool_use_package.tools.base_tool import BaseTool
from tool_use_package.tool_user import ToolUser
import requests
from opensearchpy import OpenSearch
import re
import json
import os
import sys
import boto3
from botocore.config import Config
import xml.etree.ElementTree as ET

config = Config(read_timeout=1000) # second
boto3_bedrock = boto3.client('bedrock-runtime', config=config)
model_id = 'anthropic.claude-3-sonnet-20240229-v1:0'

### 1. 意图识别

In [2]:
intention_prompt="""
请帮助根据用户的请求将其分类，以下<cate>中为分类标签
<cate>
code_developer: 代码开发和程序设计，希望查找代码库获得具体功能，或者得到step by step的设计步骤和伪代码
code_reviewer: 代码分析及审核，希望分析提供的原始代码片段，确定存在的任何错误，并提供解决这些问题的代码的更正版本，以及您的修复程序是如何解决这些问题的
code_optimaler: 代码优化，希望分析提供的代码片段，并提出改进建议以优化其性能。确定可以使代码更高效、更快或资源密集度更低
log_monitor : 系统日志排查，希望分析提供的系统日志记录，从中分析出系统问题，并提出解决方法
no_cate: 无法判断分类
</cate>
输出prompt模版应该包括三个部分:
必须严格用<example>中给出的样例格式回复,不需要解释，也不要包括任何评论:
<example>
<cate_type>
  code_developer（具体分类）
</cate_type>
<code_language>
   python（客户问题中的代码语言,如果没有找到则为空）
</code_language>
<time_stamp>
   20240124（客户问题中的时间信息,如果没有找到则为空）
</time_stamp>
<reasoning>
  客户需要设计步骤（分类原因）
</reasoning>
</example>
"""



def generate_message(bedrock_runtime, model_id, system_prompt, messages, max_tokens):

    body=json.dumps(
        {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "system": system_prompt,
            "messages": messages
        }  
    )      
    response = bedrock_runtime.invoke_model(body=body, modelId=model_id)
    response_body = json.loads(response.get('body').read())   
    return response_body

def get_intention_type(user_query:str):
    system_prompt = intention_prompt
    max_tokens = 2000
    user_message =  {"role": "user", "content": user_query}
    messages = [user_message]
    response = generate_message (boto3_bedrock, model_id, system_prompt, messages, max_tokens)
    content = response['content'][0]['text']
    print(content)
    root = ET.fromstring(content)
    cate_type = root.find('cate_type').text.strip()
    time_stamp = root.find('time_stamp').text
    return cate_type,time_stamp

In [3]:
import ipywidgets as widgets

#user_query = "what's the hyperpod cluster create script?"
#user_query = "help me to review the 202312 version‘s cluster lifecycle management script"
#user_query = "give me a mount lustre's bash script"

#user_query = "找一下hyperpod集群创建的bash脚本"
#user_query = "帮我review 202312版本的hyperpod 集群的生命周期管理脚本"
#user_query = "给我一个挂载Lustre文件系统的bash脚本"


text_box = widgets.Text(
    value='',
    placeholder='',
    description='user_query:',
    disabled=False
)
display(text_box)

#name = input("user_query: ")

Text(value='', description='user_query:', placeholder='')

In [4]:
user_query = text_box.value
print(user_query)

找一下hyperpod集群创建的bash脚本


In [5]:
### get intention
intention_type, intention_time_stamp=get_intention_type(user_query)

<example>
<cate_type>
code_developer
</cate_type>
<code_language>
bash
</code_language>
<time_stamp>
</time_stamp>
<reasoning>
用户正在寻求创建HyperPod集群的Bash脚本代码。
</reasoning>
</example>


### 2. function calling

In [6]:
ret_codes = []
ret_logs = {}
query_time_stamp = intention_time_stamp

### 用于日志检索的tool #######
class SearchAOS(BaseTool):
    host = 'your_opensearch_host'  # 替换为您的 OpenSearch 主机地址
    port = 9200  # OpenSearch 默认端口
    auth = ('user', 'password')  # 如果需要认证，请提供用户名和密码
    # 要搜索的索引名称
    index_name = 'your_index_name'        
    # 要搜索的字段名称
    field_name = 'your_field_name'
    field_timestamp = 'time_stemp_filed'
    ret_result = {}
    def use_tool(self, search_term, time_stamp):
        global query_time_stamp
        query_time_stamp = time_stamp
        # 创建 OpenSearch 客户端
        client = OpenSearch(hosts=[{'host': self.host, 'port': self.port}], http_auth=self.auth)      
        # 模糊搜索的查询字符串
        query_string = search_term
        # 构建查询请求体
        query = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "match": {
                                self.field_name: {
                                    "query": query_string,
                                    "fuzziness": "AUTO"  # 设置模糊匹配级别
                                }
                            }
                        },
                        {
                            "range": {
                                self.field_timestamp: {
                                    "gte": time_stamp
                                }
                            }
                        }
                    ]
                }
            }
        }
        results = []
        # 执行搜索请求
        response = client.search(index=index_name, body=query)        
        # 获取搜索结果
        hits = response['hits']['hits']
        # 打印搜索结果
        for hit in hits:
            item = {"field_value":hit['_source'][field_name],
                    "time_stamp":hit['_source'][field_timestamp]}
            results.append(item)
        json_str = json.dumps(results)
        json_obj = json.loads(json_str)
        global ret_logs
        ret_logs = json_obj
        return json_obj


### 用于代码片段检索的tool #######
class SearchGitHubCode(BaseTool):

    repo_url = "https://github.com/qingyuan18/easy_hyperpod"
    token = "ghp_6KmgrKb6C4K1OPH****"
    def use_tool(self, search_term):
        """
        Search code in a GitHub repository.
        
        Args:
            repo_url (str): The URL of the GitHub repository.
            token (str): A GitHub personal access token.
            search_term (str): The term to search for in the code.
        
        Returns:
            list: A list of tuples containing the file name and URL for each match.
        """
        # Extract the owner and repository name from the URL
        match = re.match(r'https://github\.com/([^/]+)/([^/]+)', self.repo_url)
        if match:
            owner, repo = match.groups()
        else:
            raise ValueError("Invalid GitHub repository URL.")
        
        # Search the repository using the GitHub API
        base_url = f"https://api.github.com/search/code?q={search_term}+in:file+repo:{owner}/{repo}"
        #base_url = f"https://api.github.com/search/code?q={search_term}+in:file+user:{owner}“
        print("base_url=="+base_url)
        headers = {"Authorization": f"Bearer {self.token}",
                  "Accept":"application/vnd.github.text-match+json",
                  "X-GitHub-Api-Version":"2022-11-28"}
        
        results = []
        
        while base_url:
            response = requests.get(base_url, headers=headers)
            response.raise_for_status()
            data = response.json()
            
            for item in data["items"]:
                code_fragment = ""
                for match in item["text_matches"]:
                    code_fragment = code_fragment + match["fragment"]+"\n"
                file_name = item["name"]
                file_url = item["html_url"]
                results.append({"file_name":file_name,"file_url":file_url,"code_fragment":code_fragment})
                print(results)
            
            base_url = response.links.get("next", {}).get("url")
        print("call search code tool success!")
        global ret_codes
        ret_codes=results[:]
        return results
    

In [7]:
searchcode_tool_name = "search_github_code"
searchcode_tool_description = """Returns list of turple, each item of the list contains code filename , html url and matched code text for a given query term.
Use this tools WHENEVER When querying code related issues, which refers to own code repository or open code issues. 
"""
searchcode_tool_arameters = [
    {"name": "search_term", "type": "str", "description": "search term."}
]


searchcode_tool= SearchGitHubCode(searchcode_tool_name,searchcode_tool_description,searchcode_tool_arameters)


# 3. Assign Tool 
tool_user = ToolUser([searchcode_tool],first_party=False , 
                     #model="anthropic.claude-3-sonnet-20240229-v1:0")
                     model="anthropic.claude-v2:1")

messages = [
    {
        "role":"user", 
        #"content":"""in our own code repo, what's the script of train for wandb?"""
        "content":user_query
    }
]
print(tool_user.use_tools(messages, execution_mode="automatic"))

base_url==https://api.github.com/search/code?q=hyperpod cluster create bash+in:file+repo:qingyuan18/easy_hyperpod
[{'file_name': 'hyperpod_setup.sh', 'file_url': 'https://github.com/qingyuan18/easy_hyperpod/blob/bab219f9ebcaf3062cd3e4367af9894aed837f7e/startup/hyperpod_setup.sh', 'code_fragment': '    # 使用 aws cli 将 hyperpod_iam_policy.json 中的权限 policy 添加到用户输入的 IAM role 角色权限中\n    aws iam put-role-policy --role-name "$role_name" --policy-name hyperpod-policy --policy-document file://hyperpod_iam_policy.json\n    # 将 role 角色名设置为全局变量\n    controller_group="compute-nodes"\n    target_id="sagemaker-cluster:${cluster_id}_${controller_group}-${instance_id}"\n    aws ssm start-session --target "$target_id" --region "$region"\n'}]
call search code tool success!


根据搜索结果,我找到了一个名为hyperpod_setup.sh的bash脚本,它似乎用于创建hyperpod集群。脚本文件位于https://github.com/qingyuan18/easy_hyperpod/blob/bab219f9ebcaf3062cd3e4367af9894aed837f7e/startup/hyperpod_setup.sh这个仓库中。

返回的代码片段显示了一些与为角色添加IAM policy相关的逻辑,以及启动hyperpod 

In [8]:
source_codes_context = ""
log_monitor_context = ""

def get_code_source(code_url):
    file_name = code_url.split('/')[-1]
    response = requests.get(code_url).text
    start_index = response.find('"blob":{"rawLines":[')
    if start_index != -1:
        # 找到结尾位置
        end_index = response.find(']}}', start_index)      
        # 提取源代码文本
        source_code_text = response[start_index + 19:end_index]
        # 解析JSON字符串为列表
        #source_code_lines = json.loads('[' + source_code_text + ']')
        return {'filename': file_name, 'source_text': source_code_text}
    else:
        print(f"Error fetching {url}")
            

def get_code_plantext(code_url):
    try:
        response = requests.get(code_url)
        response.raise_for_status()  # 如果响应状态码不是200,就会抛出异常
        file_name = code_url.split('/')[-1]
        #print(response.text)
        source_text = response.text
        return {'filename': file_name, 'source_text': source_text}
    except requests.exceptions.RequestException as e:
        print(f"Error fetching {url}: {e}")



##代码审查/代码优化需要检索到的源代码文件全文
if intention_type == "code_reviewer" or intention_type== "code_optimaler":
    source_codes =[]
    for ret_item in ret_codes:
        code_url = ret_item["file_url"] 
        source_codes.append(get_code_source(code_url))
    source_codes_context = json.dumps(source_codes)
##开发/设计，使用代码片段
elif intention_type == "code_developer":
    source_codes_context = json.dumps(ret_codes)
##日志检索结果
elif intention_type == "log_monitor":
    log_monitor_context = json.dumps(ret_logs)
else:
    source_codes_context = "N/A"
    log_monitor_context = "N/A"

#print(source_codes_context)

### 3. Instruct Prompt Engineering

In [9]:
# system prompt
system_prompt = ""
        
#code_developer_prompt="""Your task is to convert the provided source code and the user's development request into clear and concise steps and code implementations.
#The code implementation will reuse the provided source code as much as possible, and for each step of the implementation, provide necessary details and explanations to ensure that readers can successfully complete the task.
#The provided source code is:
#{source_code}
#The user's development request is:
#{user_request}"""


#code_reviewer_prompt="""Your task is to analyze the provided source code based on the user's request, identify any errors or issues present, and provide a corrected version to address these problems.
#Explain the issues you found in the provided source code and how your fix resolves them.
#The corrected code should be runnable, efficient, and follow best programming practices.
#The provided source code is:
#{source_code}
#The user's request is:
#{user_request}"""


#code_optimaler_prompt="""Your task is to analyze the provided {source_code} based on the {user_request}, and provide suggestions for optimizing its performance.
#Identify segments in the code that can become more efficient, faster, or more resource-saving.
#Provide specific optimization recommendations and explain how these changes will improve the code's performance.
#The optimized code should maintain the original functionality while exhibiting higher efficiency.
#The provided source code is:
#{source_code}
#The user's request is:
#{user_request}"""


#log_monitor_prompt="""Now you're a software developer, you have deployed a new version on {query_start_timestamp}, below is a JSON list of error logs produced by your program near the deployment time, please do the following task:
#1. Generate a list of summary of these error logs before deployment time, the format is keyword, the number of logs;
#2. Generate a list of summary of these error logs after deployment time, the format is keyword, the number of logs;
#3. List all the new errors after deployment time;
#Below is the log  list:
#{log_list}
#"""

code_developer_prompt="""您的任务是将提供的源代码和用户的开发请求转换为清晰简洁的步骤及及代码实现。
代码实现将尽可能地重用提供的源代码,并在实现的每个步骤提供必要的细节和解释,以确保读者能够成功完成任务。
提供的源代码为:
{source_code}
用户的开发请求是:
{user_request}"""

code_reviewer_prompt="""您的任务是根据用户的请求分析提供的源代码,识别其中存在的任何错误或问题,并提供一个修正后的版本来解决这些问题。
解释您在提供的源代码中发现的问题,以及您的修复如何解决它们。
修正后的代码应该是可运行的、高效的,并遵循编程的最佳实践。
提供的源代码为:
{source_code}
用户的请求是:
{user_request}"""

code_optimaler_prompt="""您的任务是根据用户的请求分析提供的源代码,并提出优化其性能的建议。
识别代码中可以变得更加高效、更快或更节省资源的片段。
提供具体的优化建议,并解释这些更改如何提高代码的性能。
优化后的代码应该保持原有的功能,同时展示更高的效率。
提供的源代码为:
{source_code}
用户的请求是:
{user_request}"""

log_monitor_prompt="""您是一名软件开发人员,下面是您的程序在{query_start_timestamp}时间附近产生的一个JSON错误日志列表。
请执行以下任务:
1. 生成部署后这些错误日志的摘要列表,格式为关键字,日志数量;
2. 列出部署后新出现的所有错误。
下面是日志列表:
{log_list}"""

match intention_type:
    case "code_developer":
        system_prompt = code_developer_prompt.format(source_code=source_codes_context,user_request=user_query)
    case "code_reviewer":
        system_prompt = code_developer_prompt.format(source_code=source_codes_context,user_request=user_query)
    case "code_optimaler":
        system_prompt = code_optimaler_prompt.format(source_code=source_codes_context,user_request=user_query)
    case "log_monitor":
        system_prompt = log_monitor_prompt.format(log_list=ret_logs,query_start_timestamp=query_time_stamp)
        


In [10]:
max_tokens = 2000
user_message =  {"role": "user", "content": user_query}
messages = [user_message]
response = generate_message (boto3_bedrock, model_id, system_prompt, messages, max_tokens)
content = response['content'][0]['text']
print(content)

根据提供的源代码和用户的请求,以下是创建 HyperPod 集群的 Bash 脚本步骤:

1. **获取 AWS 凭证**

首先,确保您已正确设置了 AWS 凭证,例如通过运行 `aws configure` 命令。

2. **克隆 easy_hyperpod 存储库**

```bash
git clone https://github.com/qingyuan18/easy_hyperpod.git
cd easy_hyperpod/startup
```

3. **运行 hyperpod_setup.sh 脚本**

```bash
bash hyperpod_setup.sh
```

此脚本将提示您输入以下信息:

- AWS 区域 (Region)
- 集群名称 (Cluster Name)
- IAM 角色名称 (IAM Role Name)

4. **脚本执行步骤解析**

该脚本执行以下主要步骤:

- 使用 AWS CLI 将 `hyperpod_iam_policy.json` 中的权限策略添加到您输入的 IAM 角色权限中。
- 设置全局变量 `controller_group` 和 `target_id`。
- 使用 `aws ssm start-session` 命令启动与 SageMaker 集群节点的会话。

5. **后续步骤**

成功运行脚本后,您将进入与 SageMaker 集群节点的交互式 SSH 会话。在该会话中,您可以执行其他必需的操作来配置和管理 HyperPod 集群。

总的来说,`hyperpod_setup.sh` 脚本是设置 HyperPod 集群所需的初始步骤。它配置了 IAM 权限,并建立了与集群节点的连接,为后续配置做好了准备。


### for test only

In [None]:
log_monitor_prompt="""Now you're a software developer, you have deployed a new version on {query_start_timestamp}, below is a JSON list of error logs produced by your program near the deployment time, please do the following task:
1. Generate a list of summary of these error logs before deployment time, the format is keyword, the number of logs;
2. Generate a list of summary of these error logs after deployment time, the format is keyword, the number of logs;
3. List all the new errors after deployment time;
Below is the log  list:
{log_list}
"""

print(log_monitor_prompt.format(query_start_timestamp="2024-03-26 03:36:15 UTC",log_list=["python3","hi"]))

In [None]:
#user_query = "I want't use rollingbatch to deploy a llama model"
#user_query = "what's the error logs of '20240218 12:08:00' mean?"
intention_type, intention_time_stamp=get_intention_type(user_query)

messages = [
    {
        "role":"user", 
        #"content":"""in our own code repo, what's the script of train for wandb?"""
        "content":"""在我们自己的代码库中, 帮我找一个llama-2-70B的部署脚本"""
    }
]
print(tool_user.use_tools(messages, execution_mode="automatic"))