In [120]:
!df -h


Filesystem      Size  Used Avail Use% Mounted on
devtmpfs         94G     0   94G   0% /dev
tmpfs            94G   20K   94G   1% /dev/shm
tmpfs            94G  768K   94G   1% /run
tmpfs            94G     0   94G   0% /sys/fs/cgroup
/dev/nvme0n1p1  160G  144G   17G  90% /
/dev/nvme1n1    453G   32G  399G   8% /home/ec2-user/SageMaker
tmpfs            19G     0   19G   0% /run/user/1000
tmpfs            19G     0   19G   0% /run/user/1002
tmpfs            19G     0   19G   0% /run/user/1001


In [None]:
!pip install langchain[all]
!pip install sagemaker --upgrade
!pip install  boto3
!pip install requests_aws4auth
!pip install requests
!pip install transformer
!pip install opensearch-py

## initial sagemaker env

In [42]:
import os
import sagemaker
import boto3
import json
from typing import Dict

sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
sm_client = boto3.client("sagemaker-runtime")

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")



sagemaker role arn: arn:aws:iam::687912291502:role/webui-notebook-stack-ExecutionRole-62U5FV4LJQS
sagemaker bucket: sagemaker-us-west-2-687912291502
sagemaker session region: us-west-2


## intial lanchain lib

In [130]:
from langchain.vectorstores import OpenSearchVectorSearch
from langchain import PromptTemplate, SagemakerEndpoint
from langchain.chains.question_answering import load_qa_chain
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.llms.sagemaker_endpoint import ContentHandlerBase
from langchain.docstore.document import Document
from langchain.memory import ConversationBufferWindowMemory
from langchain import LLMChain

#os.environ["OPENAI_API_KEY"]= "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"
#from langchain.embeddings.openai import OpenAIEmbeddings

hostname="vpc-llm-rag-aos-seg3mzhpp76ncpxezdqtcsoiga.us-west-2.es.amazonaws.com"
region='us-west-2'
username="admin"
passwd="(OL>0p;/"
index_name="qa_index"
size=10

class EmbeddingContentHandler(ContentHandlerBase):
    content_type = "application/json"
    accepts = "application/json"

    def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes:
        input_str = json.dumps({"inputs": prompt, **model_kwargs})
        return input_str.encode('utf-8')
    
    def transform_output(self, output: bytes) -> str:
        response_json = json.loads(output.read().decode("utf-8"))
        return response_json["vectors"]

embedding_content_handler = EmbeddingContentHandler()
sm_embeddings = SagemakerEndpointEmbeddings(
    # endpoint_name="endpoint-name", 
    # credentials_profile_name="credentials-profile-name", 
    #endpoint_name="huggingface-textembedding-bloom-7b1-fp1-2023-04-17-03-31-12-148", 
    endpoint_name="st-paraphrase-mpnet-base-v2-2023-04-17-10-05-10-718-endpoint",
    region_name="us-west-2", 
    content_handler=embedding_content_handler
)


class ExtractContentHandler(ContentHandlerBase):
    content_type = "application/json"
    accepts = "application/json"

    def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes:
        input_str = json.dumps({"ask": prompt})
        return input_str.encode('utf-8')
    
    def transform_output(self, output: bytes) -> str:
        model_predictions = json.loads(output.read().decode("utf-8"))
        generated_text = model_predictions["answer"]
        return generated_text

feature_extraction_handler = ExtractContentHandler()
feature_extraction_llm=SagemakerEndpoint(
        endpoint_name="pytorch-inference-2023-04-20-07-28-31-042", 
        region_name="us-west-2", 
        content_handler=feature_extraction_handler
)

class TextGenContentHandler(ContentHandlerBase):
    content_type = "application/json"
    accepts = "application/json"

    def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes:
        input_str = json.dumps({prompt: prompt, **model_kwargs})
        return input_str.encode('utf-8')
    
    def transform_output(self, output: bytes) -> str:
        response_json = json.loads(output.read().decode("utf-8"))
        return response_json[0]["generated_text"]

text_gen_content_handler = TextGenContentHandler()
sm_llm=SagemakerEndpoint(
        endpoint_name="bloomz-7b1-mt-2023-04-17-02-49-58-645-endpoint", 
        region_name="us-west-2", 
        model_kwargs={"temperature":1e-10},
        content_handler=text_gen_content_handler
)

opensearch_vector_search = OpenSearchVectorSearch(
    "https://"+hostname+":443",
    index_name,
    sm_embeddings
)

## func(for local test ) ##

In [146]:
import boto3
import json
import requests
import time
from collections import defaultdict
from requests_aws4auth import AWS4Auth
import os
from opensearchpy import OpenSearch, RequestsHttpConnection
from langchain.vectorstores import OpenSearchVectorSearch
from langchain import PromptTemplate, SagemakerEndpoint
from langchain.chains.question_answering import load_qa_chain
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.llms.sagemaker_endpoint import ContentHandlerBase
from langchain.docstore.document import Document
from langchain.memory import ConversationBufferWindowMemory
from langchain import LLMChain

source_includes = ["question","answer"]
runtime= boto3.client('runtime.sagemaker')
headers = { "Content-Type": "application/json" }

endpoint_name="pytorch-inference-2023-04-20-07-28-31-042"
region_name="us-west-2"

class ExtractContentHandler(ContentHandlerBase):
    content_type = "application/json"
    accepts = "application/json"

    def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes:
        input_str = json.dumps({"ask": prompt})
        return input_str.encode('utf-8')

    def transform_output(self, output: bytes) -> str:
        model_predictions = json.loads(output.read().decode("utf-8"))
        generated_text = model_predictions["answer"]
        return generated_text
feature_extraction_handler = ExtractContentHandler()
feature_extraction_llm=SagemakerEndpoint(
    endpoint_name=endpoint_name,
    region_name=region_name,
    content_handler=feature_extraction_handler
)

########parse k-NN search resule###############
# input:
#  r: AOS returned json
# return:
#  result : array of topN text
#############################################
def parse_results(r):
    res = []
    result = []
    print(r)
    for i in range(len(r['hits']['hits'])):
        h = r['hits']['hits'][i]
        if h['_source']['question'] not in clean:
            result.append(h['_source']['question'])
            res.append('<第'+str(i+1)+'条信息>'+h['_source']['question'] + '。</第'+str(i+1)+'条信息>\n')
    print(res)
    return result

########get embedding vector by SM llm########
# input:
#  questions:question texts(list)
#  sm_client: sagemaker runtime client
#  sm_endpoint:Sagemaker embedding llm endpoint
# return:
#  result : vector of embeded text
#############################################
def get_vector_by_sm_endpoint(questions,sm_client,endpoint_name,parameters):
    response_model = sm_client.invoke_endpoint(
        EndpointName=endpoint_name,
        Body=json.dumps(
            {
                "inputs": questions,
                "parameters": parameters
            }
        ),
        ContentType="application/json",
    )
    json_str = response_model['Body'].read().decode('utf8')
    json_obj = json.loads(json_str)
    embeddings = json_obj['sentence_embeddings']
    return embeddings


########get embedding vector by lanchain vector search########
# input:
#  questions:question texts(list0
#  embedings:lanchain embeding models
# return:
#  result : vector of embeded text
#############################################################
def get_vector_by_lanchain(questions , embedings):
    doc_results = embeddings.embed_documents(questions)
    print(doc_results[0])
    return doc_results


########k-nn search by lanchain########
# input:
#  q:question text
#  vectorSearch: lanchain VectorSearch instance
# return:
#  result : k-NN search result
#############################################################
def search_using_lanchain(question, vectorSearch):
    docs = vectorSearch.similarity_search(query)
    return docs



########k-nn by native AOS########
# input:
#  q_embedding:embeded question text(array)
#  index:AOS k-NN index name
#  hostname: aos endpoint
#  username: aos username
#  passwd: aos password
#  source_includes: fields to return
#  k: topN
# return:
#  result : k-NN search result
#############################################################
def search_using_aos_knn(q_embedding, hostname, username,passwd, index, source_includes, size):
    awsauth = (username, passwd)
    print(type(q_embedding))
    query = {
        "size": size,
        "query": {
            "knn": {
                "sentence_vector": {
                    "vector": q_embedding,
                    "k": size
                }
            }
        }
    }
    r = requests.post("https://"+hostname +"/"+ index + '/_search', auth=awsauth, headers=headers, json=query)
    return r.text



########k-nn ingestion by native AOS########
# input:
#  docs:ingestion source documents
#  index:AOS k-NN index name
#  hostname: aos endpoint
#  username: aos username
#  passwd: aos password
# return:
#  result : N/A
#############################################################
def k_nn_ingestion_by_aos(docs,index,hostname,username,passwd):
    auth = (username, passwd)
    search = OpenSearch(
        hosts = [{'host': hostname, 'port': 443}],
        ##http_auth = awsauth ,
        http_auth = auth ,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection
    )
    for doc in docs:
        vector_field = doc['sentence_vector']
        question_filed = doc['question']
        answer_field = doc['answer']
        document = { "question": question_filed, 'answer':answer_field, "sentence_vector": vector_field}
        search.index(index=index, body=document)


########k-nn ingestion by lanchain #########################
# input:
#  docs:ingestion source documents
#  vectorStore: lanchain AOS vectorStore instance
# return:
#  result : N/A
#############################################################
def k_nn_ingestion_by_lanchain(docs,vectorStore):
    for doc in docs:
        opensearch_vector_search.add_texts(docs,batch_size=10)


########topic extraction by lanchain sm endpoint #########################
# input:
#  docs: history QA list[(Q1,A1).(Q2,A2),(Q3,A3)]
#  k: output extracted words limit
#  llm: lanchain llm instance
# return:
#  result : output extracted features
#############################################################
def feature_extraction_by_lanchain(docs,k,sm_endpoint_nm,sm_region):
    #feature_extraction_handler = ExtractContentHandler()
    #feature_extraction_llm=SagemakerEndpoint(
    #    endpoint_name=sm_endpoint_nm,
    #    region_name=sm_region,
    #    content_handler=feature_extraction_handler
    #)
    global feature_extraction_llm
    if feature_extraction_llm is None:
        feature_extraction_llm=SagemakerEndpoint(
            endpoint_name=sm_endpoint_nm,
            region_name=sm_region,
            content_handler=feature_extraction_handler
        )
    payload = ""
    for doc in docs:
        question, answer = doc
        payload=payload+"Q:"+question+"\n"
        payload=payload+"A:"+answer+"\n"
    payload = payload+"问题：主题摘要,限制在"+str(k)+"个字内"
    extracted_feature_texts=feature_extraction_llm(payload)
    return extracted_feature_texts

## major chain pipeline ################

### 1: data process

In [134]:
all_question = """在中国区是否可用？
为什么在合成的小数据集上第一次查询的时候需要几分钟才返回？
能支持多大规模数据的查询？查询速度怎么样？
AWS Clean Rooms 是如何计费的？
AWS Clean Rooms 从哪里可以看到CRPU-hours的用量？
目前可以支持什么数据源的接入？ 
一个协作中，最大的并发查询数是多少？
我们如何说服客户相信洁净室的安全性和正确性？我在这里的大多数合规计划中都看不到Clean Rooms， https://aws.amazon.com/compliance/services-in-scope/ ？
数据源必须在AWS上么？
数据是如何进入到S3？
这些安全控制权限只是作用与分析么？能够改动它方的数据么？
协作方的数据会移动么？协作方的原始数据会集中到Clean rooms 吗？
是否发起者和数据贡献者都会被收费？
数据贡献方的S3， 会产生API会产生调用次数收费么？
是否有一个强约束，两方的数据中一定要一个join字段才能够进行分析？
如果已经在用Athena，S3桶中已经有数据了，是否能基于S3中这个数据就地加入AWS Clean Room?
是否能通过SDK也就是代码来调用Clean room的联合分析
在输出分析结果的时候，能否按照字段进行分区？
最大的参与方是多少？ 如果超过了限制怎么办？
AWS clean rooms 用的什么加密算法？
加密过程中有密文落地么？如果有，密文存在哪里？
数据上传到S3的过程中，有两种加密方式Server Side 和 Client Side加密，如果客户的安全等级比较高，在数据上传之前做了加密，再想交给clean room 去处理，之前提到的C3R的加密方式，具体是一个怎么样的流程呢？
AWS Clean Room在安全计算方面，应该归属哪一类？
是否所有的字段都可以进行加密？
如果要对某个字段进行求和，求平均的数据计算，是否可以加密？
如果想要通过某些字段进行where过滤，这些字段应该是什么类型？
C3R客户端是否有实现任何non-standard的加密算法?
AWS Clean rooms 与 AMC的区别与联系是什么？
AWS Clean rooms 与 其他的clean room服务商的区别？
有哪些典型的应用场景？
AWS Clean Rooms 的 data catalog 是如何实现的？ data sharing permission 是如何实现的？
可以在哪些地方进行Clean room的联合分析？
数据提供方如果对联合分析的收益方进行收费，或者实现一个数据授权的合同？
AWS Clean Rooms 与 AWS Data Exchange 是什么关系？
AWS Clean Rooms中是否支持视图？
如果数据合作方没有aws account，能否支持？
是否能够支持这个协作中，仅仅允许指定运行固定的SQL？
AWS Clean Rooms可以让数据贡献者提供一些样例数据进行预览么？
当一个数据贡献者的数据发生更新后会怎么样？
AWS Clean Rooms 未来有哪些前进的方向？
Service Team 有哪些相关的同事
"""
questions = all_question.split("\n")

In [135]:
all_answer = """目前没有落地中国区的时间表，已经在以下区域推出：美国东部（弗吉尼亚州北部）、美国东部（俄亥俄州）、美国西部（俄勒冈州）、亚太地区（首尔）、亚太地区（新加坡）、亚太地区（悉尼）、亚太地区（东京）、欧洲地区（法兰克福）、欧洲地区（爱尔兰）、欧洲地区（伦敦）和欧洲地区（斯德哥尔摩）
第一次查询的时候是因为调度和拉起资源的影响，一般第二次查询就会变快。 但过一段时间后，资源释放后这个问题又会出现。在资源拉起期间是不进行收费的。
能支持TB/GB级数据的查询。 一般查询延迟为几十秒到几分钟。默认计算容量为32 CRPUs, 目前这个默认计算容量不可设置，但是roadmap中未来打算让用户可以进行设置。(Slack中Ryan 提到，如果引擎中任务有积压，它能够scale up）
按照CRPU-hour单价进行计费，每个查询默认计算容量为32 CRPUs。 金额 = (0.125 hours x 32 CRPUs * $0.656 per CRPU-hour) , 有1分钟的最小计费时间。头12个月内，会有9CRPU hours的免费额度。
AWS Clean Rooms 本身的workspace中无法查看，可以在AWS Billing/Bills 中查看Usage Quantity得到该信息。
目前只支持S3，其他数据源近期没有具体计划。
5个
正在加入这些合规计划的进程中。
对，目前必须在AWS上，而且必须是同一个region。
需要数据的持有者，把数据上传到S3上，然后再用Glue爬去下，拿到表的schema。这样才能关联到AWS CleanRooms。
对，只作用于分析，不能修改对方数据
在联合分析获取他方数据时数据存在移动，但协作方的原始数据并不会存住在clean room内，clean room并不是一个物理存贮空间。
是单方收费，只有查询的接收方会进行收费。
会， Glue Data Catalog API 的调用也会被收费， 如果加密数据用了KMS-CMK也会被相应的收费。
List 和Aggregation两种不同的分析规则下有区别， List 只能支持重合用户的，所以必须要有关联字段。Aggregation可以支持你仅仅去查询对方的数据，这种情况下，是可以不指定关联字段的。
是的，就地就能加入clean room, 这个S3桶就是一般的S3桶，并没有任何特殊。但这个S3路径不能注册到AWS Lake Formation中。
可以，可以参考代码 https://gitlab.aws.dev/rmalecky/aws-clean-rooms-notebooks/-/blob/main/single_collaborator_aggregation.ipynb
目前不能
目前5个参与方为最大限制，这个是软性的限制，slack频道中有披露最大支持的硬限制为10.
C3R，是aws开源加密代码库。提供了C3R Client(一个可执行的Jar包)，目前仅支持对csv和parquet文件格式进行加密，后续可能会支持更多格式。 由于clean room把所有的字段分成三种类型： 指纹列(fingerprint column), 密封列(sealed column), 明文列(cleartext column), 他们的加密方式有所不同，C3R client 会使用AES-GCM加密算法对sealed字段进行加密，会使用HMAC(Hash-based Message Authentication Code)来对fingerprint字段进行加密。
由于C3R是客户端加密，所以clean room 关联S3中的数据已经是加密后的密文。
首先Clean room 对于Server Side的加密是透明的，无需额外处理。Clean rooms 不支持S3的客户端加密，必须采用C3R客户端进行加密，加密完成以后把数据上传到S3桶，后续流程和不加密的流程是一致的。 加密这步需要比较多的手工操作，包括：* 加密前需要创建好collaboration(协作), 得到collaboration_id后续在加密中需要提供 * 利用openssl 生成32位密钥并分享给其他协作方。* 加密过程中需要指定哪些字段为指纹列(fingerprint column), 密封列(sealed column), 明文列(cleartext column)
(待补充)
对于sealed和 fingerprint字段，只有string字段类型被支持。对于csv文件，C3R的客户端处理任何值都作为UTF-8编码的文本，加密前不会做任何其他的前置处理。对于parquet文件，对sealed和 fingerprint字段，如果出现非string的字段，会直接报错。C3R 客户端不能处理parquet中的复杂字段比如struct。
不能，只能作为cleartext明文列
基本都是标准化的算法，除了一个HKDF(一种密钥推导函数)的实现(来自RFC5869), 但是使用的是java标准加密库中的MAC算法。
AMC是一个专门服务与Amazon Ads的clean room应用，它是并且将持续是唯一的服务于Amazon Ads客户的应用服务。AWS Clean Rooms 是一个云分析服务，会服务于各个行业的数据合作需求。2023年, AMC 将会把自己的查询引擎和计算基础设置迁移到AWS Clean Rooms服务，将会帮助AMC更方便的服务于客户(他们将不再需要把自己第一方数据上传到AMC，在AWS S3上即可使用).
AWS Clean rooms 覆盖的客户范围更广。而其他的服务商客户范围相对小，需要把数据移动到他们的平台上。AWS Clean rooms则无需数据移动。(Bastion 2021年announce的时候，其他的云厂商比如(google cloud 和 microsoft azure) 还没有通用场景的clean room solution，snowflake 有，功能上有区别，只允许data provider 提供预先固定的SQL，Bastion灵活性更好。snowflake要求数据必须进入他们的数仓，aws在S3 即可)。
"""
answers=all_answer.split("\n")

answer_2 = """
包含多个行业，下面提供部分参考
* 广告营销领域：
    * 需要进行广告营销活动，流量平台方需要给广告主或者营销方他们的广告点击数据和展现数据进行分析，但是不能提供用户级别的信息。
    * 典型客户：营销方或广告主 P&G, Barclays，媒体-流量平台 Amazon Ads, Comcast, NBC Universal
* 零售领域
    * 银行和零售商需要获取他们的重叠用户，用于进行联合的市场活动，但不要把客户的其他信息暴露给彼此。（比如非场合用户）
* 医疗健康领域 (结合之前的HCLS Slides， 并不是来自Bastion)
    * 药厂和医院之间，药厂需要医院的病历数据； 药厂和外包的研发机构之间需要进行数据的共享。
    * 典型客户：Change Healthcare, AstraZeneca 
* 其他领域的一些典型的客户
    * 数据服务商 Foursquare ，Nielsen， IRI， 
    * 其他 Cars.com
"""
answers.append(answer_2)
    
all_answer3="""都是利用了AWS Lake formation, AWS Clean Rooms 里 SQL中字段级别的限制约束，是通过一种new class of AWS Lake formation permission 来实现的。
可以在clean room 的 workspace， 也可以在Redshift workspace （Note: 从目前发布产品文档上并没有，但是说明背后的引擎就是redshift severless)
需要通过AWS Data Exchange 来进行 （Note: 目前AWS Clean Rooms并没有体现）
AWS Clean Rooms 可以通过AWS Data Exchange 去浏览和寻找可用数据的合作方。 他是AWS Data Exchange的更近一步的服务，提供了可控(多种约束限制)和可审计的数据合作方式
允许客户在clean room 创建视图，并且在AWS Clean Rooms中保存物化视图，一旦退出协作，AWS Lake formation permission 将会被撤销，这些物化视图会被删除。（Note: 目前AWS Clean Rooms并没有体现）
目前这个版本不支持，后续的版本可能会考虑（NBC Universal 希望对于没有aws账号另外一方的数据可用）
可以，可以利用query template来做（Note: 目前AWS Clean Rooms并没有体现）
可以这么做，可以提供一些没有任何约束的示例数据给用户（Note: 目前AWS Clean Rooms并没有体现）
它是一种live的共享，任何更新会立刻反映到联合分析的结果中
"""
answers=answers+(all_answer3.split("\n"))

answer_4="""
主要有四个方向：
1. Identity matching 身份ID对齐 (Note: 目前这项在官方的PPT在有体现)
2. 对隐私攻击的防护， 有些查询即使是一些聚合分析，仍然可能探查到个人的信息
    1. 限制访问同一块范围数据的query的数量
    2. 采用差分隐私(Differential Privacy) 结果中添加噪声(会影响分析的精度)，Morgen Stanley 作为这个功能的beta用户
3. 机器学习，P&G 表现出这方面的需求， 应该也是基于表格数据的模型，可能是流失预测，人群聚类等场景
4. 和DSP的集成，直接把激活用户ID给到对接的DSP，而不通过cleanroom的数据接收方
"""
answers.append(answer_4)
               
answer_5="""Horne, Bill <bgh@amazon.com>, Rababy, Bethany <rababyb@amazon.com>, Malecky, Ryan <rmalecky@amazon.com>, Malik, Mohsen <mmohsen@amazon.com>, Tanna, Shamir <tannas@amazon.com>"""
answers.append(answer_5)

In [None]:
#import sys
#sys.path.append("./code/")
#import func


parameters = {
      #"early_stopping": True,
      #"length_penalty": 2.0,
      "max_new_tokens": 50,
      "temperature": 0,
      "min_length": 10,
      "no_repeat_ngram_size": 2,
}
endpoint_name="st-paraphrase-mpnet-base-v2-2023-04-17-10-05-10-718-endpoint"
##########embedding by llm model##############
#sentense_vectors=get_vector_by_sm_endpoint(questions[0:5],sm_client,endpoint_name,parameters)
sentense_vectors=get_vector_by_sm_endpoint(questions[5:10],sm_client,endpoint_name,parameters)
#sentense_vectors = get_vector_by_lanchain(questions , sm_embeddings)


In [None]:
print(len(sentense_vectors))

In [104]:
docs=[]
for index, sentence_vector in enumerate(sentense_vectors):
    #print(index, sentence_vector)
    doc = {
        "question":questions[index],
        "answer": answers[index],
        "sentence_vector": sentence_vector
          }
    docs.append(doc)
#data = [{str(i): j for i, j in zip(['question', 'answer','sentence_vector'], values)} for values in zip(questions, answers,sentense_vectors)]
#docs = json.dumps(data)
#print(len(docs[2]['sentence_vector']))

#########ingestion into aos ###################
k_nn_ingestion_by_aos(docs,index_name,hostname,username,passwd)
#k-nn_ingestion_by_lanchain(docs,opensearch_vector_search)  

ConnectionTimeout: ConnectionTimeout caused by - ConnectTimeout(HTTPSConnectionPool(host='vpc-llm-rag-aos-seg3mzhpp76ncpxezdqtcsoiga.us-west-2.es.amazonaws.com', port=443): Max retries exceeded with url: /qa_index/_doc (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f639ba2eca0>, 'Connection to vpc-llm-rag-aos-seg3mzhpp76ncpxezdqtcsoiga.us-west-2.es.amazonaws.com timed out. (connect timeout=10)')))

## 2:KNN search topN questio

In [None]:
#########2:KNN search topN question#########################
query = """what's the functions of cleaning room?"""
query_embedding = get_vector_by_sm_endpoint(query,sm_client,endpoint_name,parameters)
context = parse_results(search_using_aos_knn(query_embedding, hostname, username,passwd, 
                                           index, source_includes, size))
#context = parse_results(search_using_lanchain(question, vectorSearch))

## 3:warp up context ，construct prompt and call llm lanchain

In [None]:
#########3:warp up context ，construct prompt and call llm lanchain######3
prompt_template = """Use the following pieces of context to answer the question at the end.
{context}

Question: {question}
Answer:"""
PROMPT = PromptTemplate(
    template=prompt_template, input_variables=["context", "question"]
)

memory = ConversationBufferWindowMemory(k=5)

chain = LLMChain(
    llm=sm_llm,
    prompt=PROMPT,
    memory=memory
)

chain({"input_documents": NONE, "question": query, "context": context}, return_only_outputs=True)
memory.load_memory_variables({})


In [131]:
payload1 = """question:GPU优化作用？
              answer:一般来说，GPU无法在检索数据同时执行这些计算。此外，现代GPU的计算性能远远高于每个操作(被称为GPU编程中的核)所需的内存传输速度。
核融合是一种基于GPU计算的优化方法，通过在一次内核调用中执行多个连续操作。该方法提供了一种最小化数据传输的方法：中间结果留在GPU寄存器中，而不是复制到VRAM，从而节省开销。
我们使用了Megatron-LM提供了几个定制化融合CUDA核。首先，我们使用一个优化核来执行LayerNorm，以及用核来融合各种缩放、掩码和softmax操作的各种组合。
使用Pytorch的JIT功能将一个偏差项添加至GeLU激活中。作为一个使用融合核的例子，在GeLU操作中添加偏差项不会增加额外的时间，因为该操作受内存限制：与GPU VRAM和寄存器之间的数据传输相比，额外的计算可以忽略不计。
因此融合这两个操作基本上减少了它们的运行时间
              question：如何优化BLOOM？
              answer：我们使用上表3中详细描述的超参数来训练BLOOM的6个尺寸变体。
架构和超参数来自于我们的实验结果(Le Scao et al.)和先前的训练大语言模型(Brown et al.)。
非176B模型的深度和宽度大致遵循先前的文献(Brown et al.)，偏离的3B和7.1B只是为了更容易适合我们训练设置。
由于更大的多语言词表，BLOOM的embedding参数尺寸更大。在开发104B参数模型的过程中，我们使用了不同的Adam 
参数、权重衰减和梯度裁剪来对目标稳定性进行实验，但没有发现其有帮助。
对于所有模型，我们在410B tokens使用cosine学习率衰减调度，在计算允许的情况下，将其作为训练长度的上限，并对375M tokens进行warmup。
我们使用权重衰减、梯度裁剪，不使用dropout。ROOTS数据集包含341B tokens的文本。然而，基于训练期间发布的修订scaling laws，我们决定在重复数据上对大模型进行额外25B tokens的训练。
由于warmup tokens + decay tokens大于总的token数量，所以学习率衰减始终未达到终点
            问题：主题摘要,限制在20个字内
           """
feature_extraction_llm(payload1)

'大语言模型 BLOOM 的优化方法包括GPU 融合、cosine 学习率调度、使用 warmup 和降温策略。'

In [151]:
docs=[("GPU优化作用","""一般来说，GPU无法在检索数据同时执行这些计算。此外，现代GPU的计算性能远远高于每个操作(被称为GPU编程中的核)所需的内存传输速度。
核融合是一种基于GPU计算的优化方法，通过在一次内核调用中执行多个连续操作。该方法提供了一种最小化数据传输的方法：中间结果留在GPU寄存器中，而不是复制到VRAM，从而节省开销。
我们使用了Megatron-LM提供了几个定制化融合CUDA核。首先，我们使用一个优化核来执行LayerNorm，以及用核来融合各种缩放、掩码和softmax操作的各种组合。
使用Pytorch的JIT功能将一个偏差项添加至GeLU激活中。作为一个使用融合核的例子，在GeLU操作中添加偏差项不会增加额外的时间，因为该操作受内存限制：与GPU VRAM和寄存器之间的数据传输相比，额外的计算可以忽略不计。
因此融合这两个操作基本上减少了它们的运行时间"""),
      ("如何优化BLOOM","""我们使用上表3中详细描述的超参数来训练BLOOM的6个尺寸变体。
架构和超参数来自于我们的实验结果(Le Scao et al.)和先前的训练大语言模型(Brown et al.)。
非176B模型的深度和宽度大致遵循先前的文献(Brown et al.)，偏离的3B和7.1B只是为了更容易适合我们训练设置。
由于更大的多语言词表，BLOOM的embedding参数尺寸更大。在开发104B参数模型的过程中，我们使用了不同的Adam 
参数、权重衰减和梯度裁剪来对目标稳定性进行实验，但没有发现其有帮助。
对于所有模型，我们在410B tokens使用cosine学习率衰减调度，在计算允许的情况下，将其作为训练长度的上限，并对375M tokens进行warmup。
我们使用权重衰减、梯度裁剪，不使用dropout。ROOTS数据集包含341B tokens的文本。然而，基于训练期间发布的修订scaling laws，我们决定在重复数据上对大模型进行额外25B tokens的训练。
由于warmup tokens + decay tokens大于总的token数量，所以学习率衰减始终未达到终点""")]
feature_extraction_by_lanchain(docs,40,"pytorch-inference-2023-04-20-07-28-31-042","us-west-2")

'优化GPU使用和BLOOM模型训练，包括使用核融合和定制化CUDA核，减少数据传输和时间，使用Adam学习率调度和权重衰减，避免稳定性问题。在重复数据上增加额外训练25B tokens，但学习率衰减未达到终点。'

In [38]:
auth = ('admin', '(OL>0p;/')
search = OpenSearch(
         hosts = [{'host': hostname, 'port': 443}],
         ##http_auth = awsauth ,
         http_auth = auth ,
         use_ssl = True,
         verify_certs = True,
         connection_class = RequestsHttpConnection
     )
print(search)

<OpenSearch([{'host': 'vpc-llm-rag-aos-seg3mzhpp76ncpxezdqtcsoiga.us-west-2.es.amazonaws.com', 'port': 443}])>
