In [0]:
# Lựa chọn n_layer tối ưu. Dựa trên dung lượng VRAM đang trống. 
# Với Gemma 2 9b, Mỗi layer chiếm khoảng 240MB 

# Xóa llm 
if 'llm' in locals(): del llm
if 'llms_summary' in locals(): del llms_summary
if 'llms_evaluate' in locals(): del llms_evaluate

# Lấy VRAM còn trống 
_, free_GPUs = z.showUsage()
print(f'Free VRAM: {free_GPUs[0]}MB')

n_layer1 = int(free_GPUs[0] / 240)
n_layer2 = 0

if n_layer1 == 0:
    print(f'Sufficient VRAM (n_layer=={n_layer}), use CPU')
    n_layer1 = None
    n_layer2 = None
    
else:
    if n_layer1 > 43:
        n_layer2 = n_layer1 - 43
        n_layer1 = 43
        
    elif n_layer1 == 43:
        n_layer1 = 40
        
    elif n_layer2 > 20:
        n_layer1 = 40
        n_layer2 = 20
        
    else:
        n_layer2 = None
            
# n_layer1 = n_layer2 = (n_layer1 + n_layer2) // 2 - 1  
print(f'Optimize n_layer1: {n_layer1}')
print(f'Optimize n_layer2: {n_layer2}')

Imported psutil 6.0.0 (pre-installed)
Imported torch 2.5.1+cu121 (pre-installed)
---CPU---
#0: 1.6%	#1: 1.6%	#2: 4.6%
#3: 3.1%	#4: 0.0%	#5: 3.1%
#6: 4.7%	#7: 1.6%	#8: 0.0%
#9: 0.0%	#10: 1.6%	#11: 6.2%
#12: 0.0%	#13: 6.2%	#14: 0.0%
#15: 0.0%	#16: 0.0%	#17: 4.8%
#18: 0.0%	#19: 1.5%

---Memory---
45121MiB (69%) free, 65237MiB total

---GPU---
#0. Tesla P100-PCIE-16GB
memory: 1209MiB (7%) free, 16287MiB total
Free VRAM: 1209MB
Optimize n_layer1: 5
Optimize n_layer2: None


In [0]:
import os
import json
import uvicorn
import asyncio
import multiprocessing

from uvicorn import Config, Server
from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks, Depends
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse

from langchain_core.documents.base import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda

from langchain_community.llms import LlamaCpp
from langchain_community.chat_models import ChatLlamaCpp
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler

from langchain_community.embeddings import LlamaCppEmbeddings
from langchain_community.embeddings import HuggingFaceBgeEmbeddings

from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader
from langchain.document_loaders import TextLoader

from langchain_experimental.text_splitter import SemanticChunker
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma

from tempfile import NamedTemporaryFile
from httpx import TimeoutException
from tenacity import retry, stop_after_attempt, wait_exponential
from werkzeug.datastructures import FileStorage
from fpdf import FPDF

# EMBEDDINGS_MODEL = "dunzhang/stella_en_1.5B_v5"
# EMBEDDINGS_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# EMBEDDINGS_MODEL = "keepitreal/vietnamese-sbert"
EMBEDDINGS_MODEL = "VoVanPhuc/sup-SimCSE-VietNamese-phobert-base"
GEMMA2_MODEL = "soc-models/gemma-2-9b-it-Q6_K_L.gguf"

model_kwargs = {"device": "cuda"}
encode_kwargs = {"normalize_embeddings": True}
embeddings = HuggingFaceBgeEmbeddings(
    model_name=EMBEDDINGS_MODEL,
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)

n_threads1 = (multiprocessing.cpu_count() - 1 ) // 2
n_threads2 = (multiprocessing.cpu_count() - 1 ) // 2

llms_summary = ChatLlamaCpp(
    model_path=GEMMA2_MODEL,
    verbose=False, 
    temperature=0,
    n_ctx=8192,  
    max_tokens=1024,  
    f16_kv=False,  
    n_gpu_layers=n_layer1,  
    n_threads=n_threads1,
    streaming=True
)

llms_evaluate = ChatLlamaCpp(
    model_path=GEMMA2_MODEL,
    verbose=False, 
    temperature=0,
    n_ctx=8192,  
    max_tokens=1024,  
    f16_kv=False,  
    n_gpu_layers=n_layer2,  
    n_threads=n_threads2,
    streaming=True
)


In [0]:
#---Start---llms response----

def process_template(template: str) -> str:
    template = template.replace("{", "{{")
    template = template.replace("}", "}}")
    return template

def create_prompt(template: str) -> PromptTemplate:
    processed_template = process_template(template)
    
    return PromptTemplate(
        template=processed_template,
        input_variables=[]
    )

def get_chain(template: str, is_summary: bool=True):
    prompt = create_prompt(template)
    chain = None
    
    if is_summary:
        chain = (
            prompt 
            | llms_summary 
            | StrOutputParser()
        )
    else:
        chain = (
            prompt 
            | llms_evaluate 
            | StrOutputParser()
        )
    
    return chain
    
async def generate_result(template: str, is_summary: bool) -> str:
    chain = get_chain(template, is_summary)
    response = await chain.ainvoke({})
    return response

async def generate_stream_json(template: str, is_summary: bool) -> str:
    chain = get_chain(template, is_summary)
    result = ""
    async for chunk in chain.astream({}):
        result += chunk
        yield json.dumps({
            "response": chunk,
            "success": True
        }) + "\n"
    print("result: ", result)
    print("------------------------------------------------------------")

async def generate_stream_str(template: str, is_summary: bool) -> str:
    chain = get_chain(template, is_summary)
    
    async for chunk in chain.astream({}):
        yield chunk

#---End---llms response----

In [0]:
import base64
import mimetypes
from email.message import EmailMessage
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# Thông tin xác thực
ACCESS_TOKEN = ""
REFRESH_TOKEN = ""
CLIENT_ID = ""
CLIENT_SECRET = ""
TOKEN_URI = ""

def get_email_receiver(user_query: str):
    email_regex = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
    emails = re.findall(email_regex, text)
    return emails

def get_credentials():
    """Tạo thông tin xác thực từ các khóa trực tiếp."""
    creds = Credentials(
        token=ACCESS_TOKEN,
        refresh_token=REFRESH_TOKEN,
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        token_uri=TOKEN_URI
    )
    if creds.expired:
        creds.refresh(Request())
    return creds

def send_email_with_attachment(receiver: str, file_evaluation_name: str, file_path: str):
    """Gửi email với nội dung và tệp đính kèm."""
    creds = get_credentials()
    service = build("gmail", "v1", credentials=creds)

    # Tạo email
    mime_message = EmailMessage()
    mime_message["To"] = receiver
    mime_message["From"] = "147.qter@gmail.com"
    mime_message["Subject"] = "Information Security Evaluation About SOC 2 Type II"
    mime_message.set_content(f"This is an email with an attachment that is a PDF file of a SOC 2 Type report security assessment based on the {file_evaluation_name} file you provided!")

    # Thêm tệp đính kèm
    content_type, _ = mimetypes.guess_type(file_path)
    maintype, subtype = content_type.split("/")
    with open(file_path, "rb") as f:
        mime_message.add_attachment(f.read(), maintype, subtype, filename=file_evaluation_name)

    # Mã hóa email
    encoded_message = base64.urlsafe_b64encode(mime_message.as_bytes()).decode()
    body = {"raw": encoded_message}

    # Gửi email
    try:
        message = service.users().messages().send(userId="me", body=body).execute()
        print(f"Message sent successfully! ID: {message['id']}")
        return True
    except Exception as error:
        print(f"An error occurred: {error}")
        
    return False

In [0]:
#---Start---query----

QA_COLLECTION = "QA_DATA"
COMMON_COLLECTION = "COMMON_DATA"

CONTENT_PAYLOAD_KEY = "content"
METADATA_PAYLOAD_KEY = "metadata"
DB_DIRECTORY = "./soc-data-store/"

MAX_DOCS_FOR_QA_DATA = 3
MAX_DOCS_FOR_COMMON_DATA = 3
MAX_DOCS_FOR_USER_FILE = 3
BATCH_SIZE_UPLOAD = 5000

def get_check_is_userquery_about_userfile_template(user_query: str):
    template = f"""
        Are you a chatbot that classifies whether the provided "user query" is a query that is intended to retrieve content from a document, report, file, or file that the user has uploaded?
        Instructions: Just look for words like: document, report, file, or a variation of those words in the query.
        
        **Required:**
        **Do not create any additional content.**
        **The final answer is True or False.**
        **If there is no result or cannot be evaluated or the result is unclear and ambiguous during evaluation, the answer must be: False**
        
        User query: {user_query}
        
        Input example 1: Summarize the information in the above report
        Output example 1: True
        
        Input example 2: What is 2-FA encryption?
        Output Example 2: False
        
        Input Example 3: Evaluate the contents of the above file
        Output Example 3: True
    """
    return template

async def check_is_userquery_about_userfile(user_query: str):
    template = get_check_is_userquery_about_userfile_template(user_query)
    result = await generate_result(template, True)
    result = result.lower().strip()
    print("check_is_userquery_about_userfile: ", result)
    
    if result.lower() == "true": return True
    return False

def existing_collection(collection_name: str) -> Chroma:
    """Create vector retriever"""
    db_directory = os.path.join(DB_DIRECTORY, collection_name)
    vectorstore = Chroma(
        persist_directory=db_directory,
        embedding_function=embeddings
    )
    
    return vectorstore

async def similarity_search(para: dict) -> list[Document]:
    """RRF retriever"""
    qa_doc_store = existing_collection(QA_COLLECTION)
    common_doc_store = existing_collection(COMMON_COLLECTION)
    user_doc_store = existing_collection(para["user_id"])
    
    is_question_about_file = await check_is_userquery_about_userfile(para["user_query"])
    all_results = []
        
    if is_question_about_file:
        user_results = user_doc_store.similarity_search_with_score(para["user_query"], k=MAX_DOCS_FOR_USER_FILE)
        print("user_results: ", user_results)
        all_results.extend(user_results)
    else: 
        qa_results = qa_doc_store.similarity_search_with_score(para["user_query"], k=MAX_DOCS_FOR_QA_DATA)
        print("qa_results: ", qa_results)
        all_results.extend(qa_results)
        
        common_results = common_doc_store.similarity_search_with_score(para["user_query"], k=MAX_DOCS_FOR_COMMON_DATA)
        print("common_results: ", common_results)
        all_results.extend(common_results)
    
    return all_results

async def get_ask_template(user_id: str, user_query: str) -> str:
    ssearch = RunnableLambda(similarity_search)
    context = await ssearch.ainvoke({"user_id": user_id, "user_query": user_query})
    context = [c[0].page_content for c in context]
    question = user_query
    
    template = f"""
        You are a support chatbot providing information about information security and network security.
        You are a chatbot that provides information about information security and cybersecurity.

        **Required:*
        **If it is a greeting question, please respond politely to the user as a chatbot that provides information about information security and cybersecurity, following the structure defined below.**
        **If the question is not related to information about information security and cybersecurity, please ask the user to ask again, following the structure defined below.**
        **Please answer the following "Information Security and Cybersecurity" topic question based on your understanding and the information provided and find relevant links, image names, tables, pages, content sources in the information, following the structure defined below:**
        
        Information: {context}
        Question: {question}
        
        **Required instructions:**
        <no-content>: no content is presented here.
        <content>: The content of that item is presented here.
        #heading<level>: <content>: The title of the answer, with additional content
        
        **The final answer must be translated into Vietnamese according to the structure:**
        <no-content>
        #heading1: <content>
            <no-content>
            ##heading1.1: <content>
                <no-content>
                ###heading1.1.1: <content>
                    <no-content>
                    ####heading1.1.1.1: <content>
                        <no-content>
                    <no-content>
                <no-content>
                ###heading1.1.2: <content>
                    <no-content>
                <no-content>
            <no-content>
        <no-content>
        #heading2: <content>
            <no-content>
            #heading2.1: <content>
                <no-content>
            <no-content>
        <no-content>
        **images**: https://example.com/images1, https://example.com/images2, ...
        **tables**: https://example.com/tables1, https://example.com/tables2, ...
        **pages**: https://example.com/pages1, https://example.com/pages2, ...
        **sources**: https://example.com/sources1, https://example.com/sources2, ...
        
        Question example: Xác thực 2 yếu tố là gì?
        Output example: 
        #heading1: Xác thực hai yếu tố (2FA) là một biện pháp bảo mật bổ sung yêu cầu người dùng cung cấp hai hình thức xác minh khác nhau để đăng nhập vào tài khoản của họ.
        #heading2: Các loại xác thực 2FA
            ##heading2.1: Yếu tố đầu tiên: Thông thường là tên người dùng và mật khẩu.
            ##heading2.2: Yếu tố thứ hai: Có thể bao gồm:
                ###heading2.2.1: Mã OTP (One-Time Password) được gửi qua SMS hoặc email.
                ###heading2.2.2: Ứng dụng xác thực trên điện thoại di động (ví dụ: Google Authenticator, Authy).
                ###heading2.2.3: Dòng chữ ký vật lý (FIDO2).
                ###heading2.2.4: Chuyển đổi sinh trắc học (nhận dạng khuôn mặt, vân tay).
        #heading3: Lợi ích của Xác thực 2FA
            ##heading3.1: Xác thực 2FA giúp tăng cường bảo mật tài khoản bằng cách làm cho việc truy cập trái phép trở nên khó khăn hơn đáng kể. Bằng cách yêu cầu hai yếu tố xác minh, 2FA ngăn chặn kẻ tấn công có được quyền truy cập vào tài khoản của bạn ngay cả khi họ biết tên người dùng và mật khẩu của bạn.

In [0]:
#---Start---evaluate----

TODO_LIST = [
    """**Đánh giá và cập nhật chính sách bảo mật định kỳ để phù hợp với tiêu chuẩn SOC 2, ISO 27001, NIST và các quy định như GDPR, HIPAA**""",
    """**Xây dựng chiến lược quản lý rủi ro dựa trên đánh giá rủi ro an ninh mạng định kỳ**""",
    """**Đảm bảo tổ chức tuân thủ các tiêu chuẩn bảo mật thông qua đánh giá và chứng nhận định kỳ bởi bên thứ ba**""",
    """**Thiết lập Trung tâm Giám sát An ninh (SOC) để giám sát 24/7**""",
    """**Thực hiện kiểm tra xâm nhập (Penetration Testing) hàng năm để xác định lỗ hổng**""",
    """**Sử dụng các công cụ như SIEM để phân tích và phản ứng nhanh với các sự kiện an ninh**""",
    """**Theo dõi và phân tích nhật ký bảo mật để phát hiện các bất thường**""",
    """**Mã hóa dữ liệu tại chỗ (at-rest) và khi truyền tải (in-transit) bằng giao thức mạnh như AES-256**""",
    """**Phân loại dữ liệu theo độ nhạy cảm và áp dụng các biện pháp bảo mật phù hợp**""",
    """**Triển khai xác thực đa yếu tố (MFA) cho mọi truy cập quan trọng**""",
    """**Định kỳ kiểm tra và thu hồi quyền truy cập không còn sử dụng**""",
    """**Thực hiện chính sách \"Least Privilege\" để giảm thiểu quyền truy cập dư thừa**""",
    """**Đảm bảo bảo mật vật lý tại các trung tâm dữ liệu, văn phòng bằng khóa cửa, camera giám sát, và hệ thống kiểm soát ra vào**""",
    """**Bảo vệ các thiết bị đầu cuối (endpoint) bằng các giải pháp EDR và phần mềm chống malware**""",
    """**Áp dụng chính sách bảo mật di động, bao gồm mã hóa dữ liệu trên thiết bị cá nhân**""",
    """**Xây dựng kế hoạch ứng phó sự cố an ninh thông tin (IRP) chi tiết và tổ chức diễn tập định kỳ**""",
    """**Thành lập nhóm phản ứng nhanh với sự cố (Incident Response Team)**""",
    """**Thực hiện phân tích nguyên nhân gốc rễ (Root Cause Analysis) sau mỗi sự cố**""",
    """**Triển khai chương trình đào tạo nhận thức an ninh mạng cho toàn bộ nhân viên**""",
    """**Tổ chức các bài kiểm tra mô phỏng (phishing simulation) để nâng cao khả năng nhận diện nguy cơ**""",
    """**Thiết lập quy trình quản lý thay đổi (Change Management) để đảm bảo mọi thay đổi đều được phê duyệt và giám sát**""",
    """**Cập nhật bản vá bảo mật định kỳ cho tất cả hệ thống và ứng dụng**""",
    """**Áp dụng công cụ tự động hóa để giảm thiểu lỗi con người trong việc triển khai thay đổi**""",
    """**Đánh giá định kỳ về chính sách bảo mật của nhà cung cấp bên ngoài**""",
    """**Thiết lập hợp đồng với các điều khoản bảo mật và trách nhiệm rõ ràng cho đối tác**""",
    """**Triển khai hệ thống phát hiện và ngăn chặn xâm nhập (IDS/IPS)**""",
    """**Thiết lập giải pháp phòng chống DDoS cho các hệ thống công khai**""",
    """**Giám sát các xu hướng tấn công mới để điều chỉnh chiến lược phòng ngừa**""",
    """**Thực hiện kiểm tra tính khả dụng và hiệu quả của hệ thống sao lưu và khôi phục dữ liệu**""",
    """**Xây dựng và kiểm tra kế hoạch khôi phục sau thảm họa (Disaster Recovery Plan) định kỳ**""",
]

n_TODO = len(TODO_LIST)

TODO_LIST_STR = "\n".join([f"{i + 1}. {item}" for i, item in enumerate(TODO_LIST)])

def process_result_of_evaluate(result: str) -> list[int]:
    newresult = ''.join((ch if ch in '0123456789.-e' else ' ') for ch in result)
    listOfNumbers = [int(i) for i in newresult.split() if i.isdigit() and int(i) > 0 and int(i) <= n_TODO]
    return listOfNumbers

def get_evaluate_template(todo_list_str: str, content: str) -> dict:
    """Query with vector db"""
    
    template = f"""
        You are the ChatBot that helps evaluate which "content" meets which to-do in the "To-Do List" in the security domain we provide you.
        
        Required:
        **Do not create any additional content. **
        **Strict evaluation with high precision and rigor, no ambiguous answers that distort the results.**
        **For each "to-do", if the "to-do" has been clearly presented in the "content", the output must contain the location of that "to-do" and not include the content of that location.**
        **If it cannot be evaluated or the evaluation is unclear during the evaluation, skip the "to-do".**
        **If the evaluation result has more than 5 satisfied "to-do" locations, you need to consider especially carefully and especially strictly before giving the final result**
        **The final answer must be a list of "to-do" location numbers obtained from the above evaluation separated by ","**
        **If there is no satisfactory "to-do", the output must be: -1**
        
        To-do list: {todo_list_str}
        Content: {content}
        
        Example output 1: 1, 2, 3
        Example output 2: 3, 7, 12, 23
        Example output 3: -1
    """

    return template

#---End---evaluate----

In [0]:
#---Start---save file----

def check_file_type(file_path: str):
    _, file_extension = os.path.splitext(file_path)
    return file_extension.lower().split(".")[-1]

def get_file_name(file_path: str):
    file_name = file_path.split('\\')[-1]
    folder_path = file_path.split('\\')[:-1]
    return file_name

def get_folder_abs_path(user_id: str) -> str:
    if not os.path.exists(DB_DIRECTORY):
        os.makedirs(DB_DIRECTORY)

    folder = DB_DIRECTORY + user_id
    if not os.path.exists(folder):
        os.makedirs(folder)
    
    folder_abs_path = os.path.abspath(folder)
    return folder_abs_path

async def save_pdf(file: str, user_id: str) -> str:
    folder_abs_path = get_folder_abs_path(user_id)
    
    pdf_content = await file.read()
    file_name = file.filename
    file_abs_path = os.path.join(folder_abs_path, file_name)

    with open(file_abs_path, "wb") as output_file:
        output_file.write(pdf_content)
    
    return file_abs_path
    
def get_check_is_socreport_template(sample_content: str):
    template = f"""
        Are you a chatbot that helps classify whether the provided "content sample" is part of the "SOC report" content?
        
        Content sample: {sample_content}
        
        **Required:**
        **No additional content is generated.**
        **The final answer is True or False.**
        **If there is no result or cannot be evaluated or the result is unclear and ambiguous during the evaluation, the answer must be: False**
        
        Example output 1: True
        Example output 2: False
        Example output 3: True
    """
    return template
    
async def check_is_socreport_file(sample_content: str):
    template = get_check_is_socreport_template(sample_content)
    result = await generate_result(template, False)
    result = result.lower().strip()
    
    if result.lower() == "true": return True
    return False
    
    
#---End---save file----

In [0]:
#---Start---create file PDF----

def initialize_pdf():
    pdf = FPDF('P', 'mm', 'A4')
    pdf.add_page()
    pdf.set_auto_page_break(auto=True, margin=15)
    pdf.add_font("DejaVu", "", "soc-fonts/DejaVuSans.ttf", uni=True)
    pdf.add_font("DejaVu-Bold", "", "soc-fonts/DejaVuSans-Bold.ttf", uni=True)
    pdf.add_font("DejaVu-Italic", "", "soc-fonts/DejaVuSerif-Italic.ttf", uni=True)
    return pdf

def add_header(pdf, title:str):
    pdf.set_font("Arial", 'B', 16)
    pdf.cell(0, 10, title, 0, 1, 'C')

def add_section_title(pdf, title:str):
    pdf.set_font("Arial", 'B', 14)
    pdf.cell(0, 10, title, 0, 1)

def add_task_list(pdf, todo_list:list[str], indices:dict, title:str):
    add_section_title(pdf, title)
    pdf.set_font("DejaVu", '', 13)
    line_height = 10
    
    for i, (key, value) in enumerate(indices.items()):
        if key < 0 or key >= len(todo_list): continue
    
        checkbox = "\u2611" if value else "\u2610"
        title_number = "1" if value else "2"
        parts = todo_list[key].split("**")
        
        pdf.set_x(pdf.get_x() + 10)
        pdf.set_font("DejaVu-Bold", '', 13)
        pdf.multi_cell(0, line_height, f"{title_number}.{i+1}. {checkbox} {parts[1].strip()}" , 0)

        # pdf.set_x(pdf.get_x() + 10)
        # pdf.set_font("DejaVu", '', 13)
        # pdf.multi_cell(0, line_height, f"\u279B{parts[2].strip()}", 0)
        
        # if value:
        #     pdf.set_x(pdf.get_x() + 10)
        #     pdf.set_font("DejaVu-Italic", '', 13)
        #     pdf.multi_cell(0, line_height, f"\u201C\u2026{value.page_content.strip()}\u2026\u201D", 0)
            
        pdf.ln(5)

def create_information_security_evaluation(todo_list:list[str], completed_indices:dict, user_id:str, file_evaluation_name:str="Information_Security_Evaluation.pdf") -> str:
    folder_abs_path = get_folder_abs_path(user_id)
    output_path = os.path.join(folder_abs_path, file_evaluation_name)
    
    pdf = initialize_pdf()
    add_header(pdf, "Information Security Evaluation")
    
    add_task_list(pdf, todo_list, completed_indices, "1. Completed Tasks",)
    pdf.add_page()
    
    uncompleted_positions = set(range(len(todo_list))) - set(completed_indices.keys())
    uncompleted_positions = {pos: None for pos in uncompleted_positions}
    add_task_list(pdf , todo_list, uncompleted_positions, "2. Pending Tasks")
    
    pdf.output(output_path)
    return file_evaluation_name, output_path

#---End---create file PDF----

In [0]:
#---Start---document----

def get_raw_documents(file_path: str):
    file_type = check_file_type(file_path)
    if file_type == "pdf":
        raw_documents = PyPDFLoader(file_path).load()
    elif file_type == "docx":
        raw_documents = Docx2txtLoader(file_path).load()
    elif file_type == "txt":
        raw_documents = TextLoader(file_path, encoding = 'UTF-8').load()
    else:
        return []
    return raw_documents
    
def process_raw_documents(raw_documents: list[Document]) -> list[Document]: 
    source = get_file_name(raw_documents[0].metadata.get('source'))
    for idx, raw_document in enumerate(raw_documents):
        page = raw_document.metadata.get('page')
        metadata = {'source': source, 'page': page}
        setattr(raw_document, 'metadata', metadata)
    return raw_documents
    
def get_documents(file_path: str) -> list[Document]:
    raw_documents = get_raw_documents(file_path)
    documents = process_raw_documents(raw_documents)
    return documents
    
def get_texts(file_path: str) -> (str, str):
    raw_documents = get_raw_documents(file_path)
    source = raw_documents[0].metadata.get('source').split('\\')[-1]
    page_content = ""
    for raw_document in raw_documents:
        page_content += raw_document.page_content
    return source, page_content

def process_documents_before_store(documents: list[Document]) -> list[Document]:
    pre_page = 0
    for index, document in enumerate(documents):
        size = len(document.page_content)
        metadata = document.metadata
        metadata['size'] = size
        metadata['index'] = index
        metadata['pre_page'] = pre_page
        pre_page = metadata.get('page')
        setattr(document, 'metadata', metadata)
        
    return documents

def split_documents_file_common(file_path: str) -> list[Document]:
    text_splitter = SemanticChunker(
        embeddings=embeddings, 
        breakpoint_threshold_type="percentile", 
        number_of_chunks=1024
    )
    
    documents = get_documents(file_path)
    documents = text_splitter.split_documents(documents)
    documents = process_documents_before_store(documents)
    return documents
    
def split_documents_file_soc(file_path: str) -> list[Document]:
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size = 4096, 
        chunk_overlap = 124,
        separators = [
            "\n\n",
            "\n",
            " ",
            ".",
            ",",
            "\u200b",  # Zero-width space
            "\uff0c",  # Fullwidth comma
            "\u3001",  # Ideographic comma
            "\uff0e",  # Fullwidth full stop
            "\u3002",  # Ideographic full stop
            "",
        ]
    )
    
    source, page_content = get_texts(file_path)
    documents = text_splitter.create_documents([page_content])
    return documents
    

In [0]:
#---Start---upload----

def upload_to_chroma(docs: list[Document], user_id: str):
    vectorstore = existing_collection(user_id)
    for i in range(0, len(docs), BATCH_SIZE_UPLOAD):
        batch = docs[i:i+BATCH_SIZE_UPLOAD]
        vectorstore.add_documents(batch)

def upload_common(file_abs_path: str, user_id: str) -> bool:
    docs = split_documents_file_common(file_abs_path)
    upload_to_chroma(docs, user_id)
    return docs[0]
    
async def upload_soc(file_abs_path: str, user_id: str):
    completed_positions = {}
    docs = split_documents_file_soc(file_abs_path)
    
    print("Length", len(docs))
    print("File name", file_abs_path)
    
    for idx, doc in enumerate(docs):
        template = get_evaluate_template(TODO_LIST_STR, doc)
        print("Template size:", len(template))
        
        result = await generate_result(template, False)
        completed_position_list = process_result_of_evaluate(result)
            
        for completed_position in completed_position_list:
            completed_positions[completed_position] = doc
        
        print("Done index: ", idx, "result: ", result, "completed_position_list: ", completed_position_list)
        print("completed_positions: ", completed_positions.keys())
        
        create_information_security_evaluation(TODO_LIST, completed_positions, user_id)
        if len(completed_position_list) == n_TODO: 
            break
        
        print("--------------------------------------------------------------------------")
            
    file_evaluation_name, pdf_output_path = create_information_security_evaluation(TODO_LIST, completed_positions, user_id)
    print("Completed positions", completed_positions.keys())
    print(f"File evaluation name: {file_evaluation_name}")
    print(f"PDF output path: {pdf_output_path}")
    
    return file_evaluation_name, pdf_output_path
    
async def background_upload_soc(user_query: str, file_abs_path: str, user_id: str):
    loop = asyncio.get_event_loop()
    
    file_evaluation_name, pdf_output_path = await upload_soc(file_abs_path, user_id)
    print("file_evaluation_name, pdf_output_path: ", file_evaluation_name, pdf_output_path)
    
    email_receiver = get_email_receiver(user_query)
    print(email_receiver)

    success = await loop.run_in_executor(
        None,
        lambda: send_email_with_attachment(email_receiver, file_evaluation_name, pdf_output_path)
    )
    
    return success

#---End---upload----

In [0]:
#---Start---API----

@app.post("/llms")
async def post_llms(
    template: str = Form(...)
):
    
    return StreamingResponse(
        generate_stream_json(template, True), media_type='text/event-stream'
    )

@app.post("/ask")
async def post_ask(
    user_id: str = Form(...), 
    user_query: str = Form(...)
):
    
    template = await get_ask_template(user_id, user_query)
    print("ask", user_id)
    print("ask", user_query)
    print("template", len(template))
    
    return StreamingResponse(
        generate_stream_json(template, True), media_type='text/event-stream'
    )

@app.post("/upload")
async def post_upload(
    user_id: str = Form(...), 
    user_query: str = Form(...),
    file: UploadFile = File(...),
    background_tasks: BackgroundTasks = BackgroundTasks()
):
    
    print("upload-common", user_id)
    print("upload-common", file.filename)
    
    if file and user_id:
        file_abs_path = await save_pdf(file, user_id)
        sample_content = upload_common(file_abs_path, user_id)
        
        is_socreport_file = False
        # is_socreport_file = await check_is_socreport_file(sample_content)
        # print("is_socreport_file: ", is_socreport_file)
        
        response = "#heading1: Tải lên file thành công! Giờ đây bạn có thể truy cập thông tin trong các file vừa gửi lên, câu trả lời sẽ sớm được phản hồi vui lòng chờ!"
        if is_socreport_file:
            background_tasks.add_task(background_upload_soc, user_query, file_abs_path, user_id)
            response = """
                #heading1: Tải lên file thành công!
                #heading2: Giờ đây bạn có thể truy cập thông tin trong các file vừa gửi lên trong khi đó chúng tôi sẽ xem xét và đánh giá bảo mật file báo cáo SOC 2 type II của bạn!
                #heading3: Kết quả sẽ được gửi qua Gmail của bạn vừa cung cấp!
            """
        
        return JSONResponse(
            content={
                "is_socreport_file": is_socreport_file,
                "response": response,
                "success": True
            },
            status_code=200 
        )
    
    return JSONResponse(
        content={
            "response": "Tải lên file không thành công, vui lòng kiểm tra lại file pdf của bạn!",
            "success": False
        },
        status_code=200 
    )

#---End---API----

In [0]:
async def main():
    config = Config(
        app=app,
        host="0.0.0.0",
        port=7722,
        log_level="debug",
        log_config=None,
        use_colors=False
    )
    
    server = Server(config)
    await server.serve()

asyncio.run(main())