# 如何构建一个基于Lambda的Query Agent Tool

In [None]:
!pip install awscli
!pip install sqlalchemy pymysql pydantic pandas

### 0. 测试pydantic框架带来的接口报错机制

In [None]:
from pydantic import BaseModel, validator, ValidationError
from typing import Optional

class Employee_Pydantic(BaseModel):
    employee: Optional[str] = None
    role: Optional[str] = None
    domain: Optional[str] = None
    scope: Optional[str] = None

    @validator('role')
    def role_check(cls, v):
        allowed_role = ['SS', 'SSA', 'GTMS', 'PM', 'TPM', 'Manager']
        allowed_role_lowercase = [ item.lower() for item in allowed_role]
        allowed_role_str = ", ".join(allowed_role)
        if v not in allowed_role_lowercase:
            raise ValueError(f"role should be in [{allowed_role_str}]")
        return v
    
    @validator('domain')
    def domain_check(cls, v):
        allowed_domain = ['leadership_team', 'AIML', 'Analytics', 'AppMod', 'Cloud_Intelligence', 'Cloud_Economics', 'Compute',
 'Connect_SES_Pinpoint','Database','HPC','Hybrid','IoT','Networking','Edge','Security','By_Services','By_Account_teams','Service Launch']
        allowed_domain_lowercase = [ item.lower() for item in allowed_domain]
        allowed_domain_str = ", ".join(allowed_domain)
        if v not in allowed_domain_lowercase:
            raise ValueError(f"domain should be in [{allowed_domain_str}]")
        return v

obj = {
    "domain" : "EC2",
    "scope" : "north"
}

try:
    gtms_obj = Employee_Pydantic(**obj)
except ValidationError as e:
    print(e.json())


### 1.创建一个RDS Database

#### 1.1 设定变量
- 前5个变量可以自行设定


In [None]:
db_subnet_group_name = "<db_subnet_group_name>" #agent_db_group
db_username = "<db_username>"                   #db_username
db_password = "<db_password>"                   #db_password
db_instance_name = "<db_instance_name>"         #agent-db-instance

- 后4个变量通过命令行查询获取
  + vpc-security-group-ids 需要指定为chatbot部署以及本notebook所在的vpc为同一个

In [None]:
#查询vpc
!aws ec2 describe-vpcs

In [None]:
#查询security-groups-id
!aws ec2 describe-security-groups --filters Name=vpc-id,Values="vpc-028348b7d6f8e5199"

In [None]:
#查询vpc对应的subnets
!aws ec2 describe-subnets --filters "Name=vpc-id,Values=vpc-028348b7d6f8e5199"

In [None]:
region = 'us-west-2'
db_az = 'us-west-2a'
vpc = "vpc-028348b7d6f8e5199"
subnet_ids = ["subnet-0d254a6652394515e", "subnet-0e3f33a74058eb579", "subnet-0ab03f3114be6a839", "subnet-0ff371bfd8bb73577"]
vpc_security_group_ids = "sg-05bb8f06a6f240879"

#### 1.2 创建DB instance
- db-subnet-group-name 可以通过下面awscli进行创建

In [None]:
!aws rds create-db-subnet-group \
    --db-subnet-group-name {db_subnet_group_name} \
    --db-subnet-group-description "DB Subnet Group For Agent" \
    --subnet-ids {" ".join(subnet_ids)}

- 创建db-instance

In [None]:
!aws rds create-db-instance \
    --db-instance-identifier {db_instance_name} \
    --allocated-storage 50 \
    --db-instance-class db.r6g.large \
    --engine mysql \
    --master-username {db_username} \
    --master-user-password {db_password} \
    --vpc-security-group-ids {vpc_security_group_ids} \
    --availability-zone {db_az} \
    --db-subnet-group-name {db_subnet_group_name}

### 2. 创建表并注入数据

#### 2.1 连接数据库，并注入数据（数据为data.csv的本地文件）
- 创建数据库后，需等待DB创建完毕，然后获取db_host，db_port
- 自行指定db_name，db_table_name

In [None]:
db_host = "<db_host>"
db_port = "<db_port>"
db_name = "simple_info_db"
db_table_name = 'employee'
local_data_csv = 'data.csv'

In [None]:
from sqlalchemy import create_engine, text, Column, Integer, String, MetaData, Table, Sequence, or_, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
import pandas as pd

connection_string = f"mysql+pymysql://{db_username}:{db_password}@{db_host}:{db_port}"
engine = create_engine(connection_string)

with engine.connect() as connection:
    result = connection.execute(text(f"CREATE DATABASE IF NOT EXISTS {db_name}"))
    
new_db_connection_string = f"mysql+pymysql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
new_db_engine = create_engine(new_db_connection_string)

print(new_db_connection_string)

Base = declarative_base()
session = Session(bind=new_db_engine)

# 定义模型类
class Employee_SQLAlchemy(Base):
    __tablename__ = db_table_name
    id = Column(Integer, primary_key=True, autoincrement=True)
    employee = Column(String(64), nullable=True)
    role = Column(String(64), nullable=True)
    domain = Column(String(64), nullable=True)
    scope = Column(String(64), nullable=True)

# # 删除全部表
# Base.metadata.drop_all(bind=new_db_engine)
# 创建表
Base.metadata.create_all(bind=new_db_engine)

# 使用模型添加数据, 需要

df = pd.read_csv(local_data_csv)
for index, row in df.iterrows():
    user_instance = Employee_SQLAlchemy(employee=row["employee"], role=row["role"], domain=row["domain"], scope=row["scope"])
    session.add(user_instance)

session.commit()

#### 2.2 验证数据摄入成功与否

In [None]:
result = session.query(Employee_SQLAlchemy).first()
print(result.employee, result.role, result.domain, result.scope)
print("\n")
query = {"employee" : "Yun Li"}

results = session.query(Employee_SQLAlchemy) \
    .filter(Employee_SQLAlchemy.employee.ilike(f'%{query["employee"]}%')).all()
for result in results:
    print(result.employee, result.role, result.domain, result.scope)
print("\n")

results = session.query(Employee_SQLAlchemy) \
    .filter(and_(Employee_SQLAlchemy.domain.ilike(f'%aiml%'), Employee_SQLAlchemy.role == 'ss')).all()
for result in results:
    print(result.employee, result.role, result.domain, result.scope)

#### 2.3 notebook中在线测试lambda

In [None]:
import json
import os
from pydantic import BaseModel, validator, ValidationError
from typing import Optional
from sqlalchemy import create_engine, text, Column, Integer, String, MetaData, Table, Sequence, or_, and_
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import Session

new_db_connection_string = f"mysql+pymysql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
new_db_engine = create_engine(new_db_connection_string)

Base = declarative_base()
session = Session(bind=new_db_engine)

# 定义模型类
class Employee_SQLAlchemy(Base):
    __tablename__ = db_table_name
    id = Column(Integer, primary_key=True, autoincrement=True)
    employee = Column(String(64), nullable=True)
    role = Column(String(64), nullable=True)
    domain = Column(String(64), nullable=True)
    scope = Column(String(64), nullable=True)

class Employee_Pydantic(BaseModel):
    employee: Optional[str] = None
    role: Optional[str] = None
    domain: Optional[str] = None
    scope: Optional[str] = None
    
    @validator('role')
    def role_check(cls, v):
        allowed_role = ['SS', 'SSA', 'GTMS', 'PM', 'TPM', 'Manager']
        allowed_role_lowercase = [ item.lower() for item in allowed_role]
        allowed_role_str = ", ".join(allowed_role)
        if v.lower() not in allowed_role_lowercase:
            raise ValueError(f"role should be in [{allowed_role_str}]")
        return v
    
    @validator('domain')
    def domain_check(cls, v):
        allowed_domain = ['leadership_team', 'AIML', 'Analytics', 'AppMod', 'Cloud_Intelligence', 'Cloud_Economics', 'Compute',
 'Connect_SES_Pinpoint','Database','HPC','Hybrid','IoT','Networking','Edge','Security','By_Services','By_Account_teams','Service Launch']
        allowed_domain_lowercase = [ item.lower() for item in allowed_domain]
        allowed_domain_str = ", ".join(allowed_domain)
        if v.lower() not in allowed_domain_lowercase:
            raise ValueError(f"domain should be in [{allowed_domain_str}]")
        return v
    
def lambda_handler(event, context):
    param = event.get('param')
    
    employee_obj = None
    try:
        employee_obj = Employee_Pydantic(**param)
    except ValidationError as e:
        return {
            'statusCode': 500,
            'body': e.json()
        }
    
    employee_sqlalchemy = Employee_SQLAlchemy(**employee_obj.dict())
    
    def format_results(results):
        converted_items = []

        for idx, item in enumerate(results):
            converted_items.append(f"[{idx}] {item.employee} works as a {item.role} role, take responsibility of '{item.scope}' in domain {item.domain}.")

        return "\n".join(converted_items)
    
    if employee_sqlalchemy.employee is not None:
        print("query by employee name")
        results = session.query(Employee_SQLAlchemy).filter(Employee_SQLAlchemy.employee.ilike(f'%{employee_sqlalchemy.employee}%')).all()
        if len(results) == 0:
            plain_result = "Can't find that employee - {gtms_obj.employee}."
        else:
            plain_result = format_results(results)
    elif employee_sqlalchemy.scope is not None:
        print("query by scope only")
        results = session.query(Employee_SQLAlchemy).filter(Employee_SQLAlchemy.scope.ilike(f'%{employee_sqlalchemy.scope}%')).all()
        if len(results) == 0:
            plain_result = "Can't find relevant information."
        else:
            plain_result = format_results(results)
    elif employee_sqlalchemy.domain is not None and employee_sqlalchemy.role is not None:
        print("query by domain and role")
        results = session.query(Employee_SQLAlchemy).filter(and_(Employee_SQLAlchemy.domain.ilike(f'%{employee_sqlalchemy.domain}%'), Employee_SQLAlchemy.role == employee_sqlalchemy.role)).all()
        if len(results) == 0:
            plain_result = "Can't find relevant information."
        else:
            plain_result = format_results(results)
    else:
        plain_result = "Can't find relevant information."
    
    return {
        'statusCode': 200,
        'body': plain_result
    }
obj = {"param":{"scope":"Lex", "role" : "SSA"}}
print(lambda_handler(obj, None))

### 3. 部署Agent Tool

#### 3.1 构建代码zip包

In [None]:
!mkdir lambda_code

In [None]:
%%writefile lambda_code/lambda_function.py
import json
import os
from pydantic import BaseModel, validator, ValidationError
from typing import Optional
from sqlalchemy import create_engine, text, Column, Integer, String, MetaData, Table, Sequence, or_, and_
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import Session

db_username = os.environ.get('db_username')
db_password = os.environ.get('db_password')
db_host = os.environ.get('db_host')
db_port = os.environ.get('db_port')
db_name = os.environ.get('db_name')
db_table_name = os.environ.get('db_table_name')

new_db_connection_string = f"mysql+pymysql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
new_db_engine = create_engine(new_db_connection_string)

Base = declarative_base()
session = Session(bind=new_db_engine)

# 定义模型类
class Employee_SQLAlchemy(Base):
    __tablename__ = db_table_name
    id = Column(Integer, primary_key=True, autoincrement=True)
    employee = Column(String(64), nullable=True)
    role = Column(String(64), nullable=True)
    domain = Column(String(64), nullable=True)
    scope = Column(String(64), nullable=True)

class Employee_Pydantic(BaseModel):
    employee: Optional[str] = None
    role: Optional[str] = None
    domain: Optional[str] = None
    scope: Optional[str] = None
    
    @validator('role')
    def role_check(cls, v):
        allowed_role = ['SS', 'SSA', 'GTMS', 'PM', 'TPM', 'Manager']
        allowed_role_lowercase = [ item.lower() for item in allowed_role]
        allowed_role_str = ", ".join(allowed_role)
        if v.lower() not in allowed_role_lowercase:
            raise ValueError(f"role should be in [{allowed_role_str}]")
        return v
    
    @validator('domain')
    def domain_check(cls, v):
        allowed_domain = ['leadership_team', 'AIML', 'Analytics', 'AppMod', 'Cloud_Intelligence', 'Cloud_Economics', 'Compute',
 'Connect_SES_Pinpoint','Database','HPC','Hybrid','IoT','Networking','Edge','Security','By_Services','By_Account_teams','Service Launch']
        allowed_domain_lowercase = [ item.lower() for item in allowed_domain]
        allowed_domain_str = ", ".join(allowed_domain)
        if v.lower() not in allowed_domain_lowercase:
            raise ValueError(f"domain should be in [{allowed_domain_str}]")
        return v
    
def lambda_handler(event, context):
    param = event.get('param')
    
    employee_obj = None
    try:
        employee_obj = Employee_Pydantic(**param)
    except ValidationError as e:
        return {
            'statusCode': 500,
            'body': e.json()
        }
    
    employee_sqlalchemy = Employee_SQLAlchemy(**employee_obj.dict())
    
    def format_results(results):
        converted_items = []
        
        print("call format_results")
        print("call format_results {}".format(len(results)))
        for idx, item in enumerate(results):
            converted_items.append(f"[{idx}] {item.employee} works as a {item.role} role, take responsibility of '{item.scope}' in domain {item.domain}.")
        
        print(converted_items)
        return "\n".join(converted_items)
    
    if employee_sqlalchemy.employee is not None:
        print("query by employee name")
        results = session.query(Employee_SQLAlchemy).filter(Employee_SQLAlchemy.employee.ilike(f'%{employee_sqlalchemy.employee}%')).all()
        if len(results) == 0:
            plain_result = "Can't find that employee - {gtms_obj.employee}."
        else:
            plain_result = format_results(results)
    elif employee_sqlalchemy.scope is not None:
        print("query by scope only")
        results = session.query(Employee_SQLAlchemy).filter(Employee_SQLAlchemy.scope.ilike(f'%{employee_sqlalchemy.scope}%')).all()
        if len(results) == 0:
            plain_result = "Can't find relevant information."
        else:
            plain_result = format_results(results)
    elif employee_sqlalchemy.domain is not None and employee_sqlalchemy.role is not None:
        print("query by domain and role")
        results = session.query(Employee_SQLAlchemy).filter(and_(Employee_SQLAlchemy.domain.ilike(f'%{employee_sqlalchemy.domain}%'), Employee_SQLAlchemy.role == employee_sqlalchemy.role)).all()
        if len(results) == 0:
            plain_result = "Can't find relevant information."
        else:
            plain_result = format_results(results)
    else:
        plain_result = "Can't find relevant information."
    
    return {
        'statusCode': 200,
        'body': plain_result
    }

In [None]:
!sh create_lambda.sh

#### 3.2 创建lambda的IAM Role

In [None]:
!aws iam create-policy --policy-name MyCustomPolicy --policy-document file://lambda_role_policy.json

+ 创建role

In [None]:
!aws iam create-role \
    --role-name AgentLambdaRole \
    --assume-role-policy-document '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'

- 提取前面输出的policy_arn和role_arn

In [None]:
policy_arn='arn:aws:iam::687752207838:policy/MyCustomPolicy'
role_arn='arn:aws:iam::687752207838:role/AgentLambdaRole'

In [None]:
!aws iam attach-role-policy --role-name AgentLambdaRole \
    --policy-arn {policy_arn}

#### 3.3 创建lambda function
  + 需要把lambda部署在vpc中，因为db也是在vpc中，否则会出现超时

In [None]:
!aws lambda create-function --function-name employee_query_tool \
    --zip-file fileb://lambda_code/my_deployment_package.zip --runtime python3.10 \
    --handler lambda_function.lambda_handler --timeout 10 --region {region} \
    --role {role_arn} \
    --vpc-config SubnetIds={",".join(subnet_ids)},SecurityGroupIds={vpc_security_group_ids}

- 为lambda function 添加环境变量，用于连接数据库

In [None]:
env_list = f"db_username='{db_username}',db_password='{db_password}',db_host='{db_host}',db_port='{db_port}',db_name='{db_name}',db_table_name='{db_table_name}'"
env_list_var = "\"{" + env_list+ "}\""
print(env_list_var)

In [None]:
!aws lambda update-function-configuration --function-name employee_query_tool \
    --environment Variables={env_list_var}