In [1]:
# !conda create -n crewai python=3.11
# !conda activate crewai -
!pip install -q --upgrade google-cloud-aiplatform
!pip install -q -U 'crewai[tools]' mdpdf 

In [98]:
import os
import vertexai
import socket
import re
from vertexai.preview.generative_models import GenerativeModel, Part
from google.cloud import storage

PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

GCS_BUCKET_LOCATION = "us-east1"
GCS_BUCKET_NAME = "patch-demo-data-folder"
GCS_BUCKET_URI = f"gs://{GCS_BUCKET_NAME}"


client = storage.Client()
bucket = storage.Bucket(client, GCS_BUCKET_NAME)

if bucket.exists()==False:
    # Create a Cloud Storage Bucket
    print(f"\n{GCS_BUCKET_NAME} not exists. \n")
else:    
    print(f"\n{GCS_BUCKET_NAME} folder already exists. Contents:\n")
    
def list_all_cves():
    bucket = client.bucket(GCS_BUCKET_NAME)    
    blobs = bucket.list_blobs()    
    cve_list = set();
    for blob in blobs:        
        parts = blob.name.split('/')
        cve_list.add(parts[0])
    return cve_list
    
    
cve_list = list_all_cves()
print(f'cve list: {cve_list}')
# list_folders(GCS_BUCKET_NAME)

def extract_patch_content(patch_files):
    content = ''
    for file in patch_files:
        content = content + file.download_as_text()
    return content

def upload_patch_content(cve, version, file_name, file_content):
    '''
    upload the patched content to the modified/ folder under the same version
    '''
    folder_name = f'{cve}/{version}/modified/{file_name}'
    print(f'folder name {folder_name}')
    folder_blob = bucket.blob(folder_name)
    if not folder_blob.exists():
        folder_blob.upload_from_string(file_content)
        print(f"Folder {folder_name} created in bucket {GCS_BUCKET_NAME}.")
    else:
        folder_blob.upload_from_string(file_content)
        # print(f"Folder {folder_name} already exists in bucket {GCS_BUCKET_NAME}.")


patch-demo-data-folder folder already exists. Contents:

cve list: {'CVE-2017-16994'}


In [113]:
def gemini_pro(full_prompt, responseType):
    model = GenerativeModel("gemini-pro")
    responses = model.generate_content(
    full_prompt,
    generation_config={
        "candidate_count": 1,
        "max_output_tokens": 8190,
        "response_mime_type": responseType,
        "temperature": 0,
        "top_p": 1
    },stream=False,)    
    return(responses.text)

def extract_codesnippets_from_patch(patch_context):
    # full_prompt = """\    
    # From this file: 
    # ---file start---
    # {patch_context}
    # ---file end---
    # I want to remove the header and description, only keep the code snippets from diff,
    # in the remaining part
    # then, I want to identify the old lines and new lines
    # should only be one array containing all the modification
    # """
    full_prompt = """\    
    From this file: 
    ---file start---
    {patch_context}
    ---file end---
    I want to remove the header and description, only keep the code snippets from diff,
    in the remaining part
    then, I want to identify the old lines and new lines for each diff
    should only output an array, which containing objects, each object has 2 propertys
    
    "old lines" property, containing old code snippet, 
    "new lines" property, containing new code snippet,
    
    output the object only
    """    
    formatted_prompt = full_prompt.format(
        patch_context=patch_context
    )
    # return formatted_prompt
    return gemini_pro(formatted_prompt, "application/json")

def extract_function_name(steps):
    full_prompt = """\    
    From this code snippet: 
    ---file start---
    {steps}
    ---file end---
    
    which function gets modified?
    
    output the function name
    """    
    formatted_prompt = full_prompt.format(
        steps=steps
    )
    # return formatted_prompt
    return gemini_pro(formatted_prompt, "text/plain")    
    
# def trim_content(steps):
#     full_prompt = """\    
#     From this array: 
    
#     ---array start---
#     {steps}
#     ---array end---

#     remove any + or - or any empty space or any tabs at the start of each content
    
#     output the array
#     """
#     formatted_prompt = full_prompt.format(
#         steps=steps
#     )    
#     return gemini_pro(formatted_prompt, "application/json")

def generate_patched_file(target_file, diff, function_name):
    full_prompt = """\    
    You are an regex agent, you can replace old code with new code.
    You only do necessary changes when replace old code with new code.
    
    Now, I have this diff:
    
    ---diff start---
    {diff}
    ---diff end---
    
    in this target_file
    ---target_file start---
    {target_file}
    ---target_file end---
    
    modify the target_file according to diff, inside {function_name} replace the old code with new code
    add a trailing brief comment wherever you modified
    
    output the full file in text/plain format
    """   
    formatted_prompt = full_prompt.format(
        target_file=target_file,
        diff=diff,
        function_name=function_name
    )   
    # full_prompt = f'Use these steps {steps} to modify the file {target_file}, for each line, find the value in the "old" property in the file content, and replace with the content in the "new" property, output the modified file'
    return gemini_pro(formatted_prompt, "text/plain")

def main():
    bucket = client.get_bucket(GCS_BUCKET_NAME)
    for cve in sorted(cve_list):
        print(f'CVE: {cve}')
        
        # for test case purpose only
        if 'CVE-2017-16994' not in cve:
            print(f'Unknown CVEs')
            return
        
        # Step 1: Get the CVE patch files or related patches
        all_files_in_patch_files_folder = list(bucket.list_blobs(prefix=f'{cve}/patch-files/'))
        patch_files = [blob for blob in all_files_in_patch_files_folder if blob.name.endswith('.patch')]
        print(f'what is patch_files, {patch_files}')
        if len(patch_files) > 1:
            print(f'{cve} has more than 1 patch')
        patch_content = extract_patch_content(patch_files)        
        
        # Step 2: Get the target vulnerable files
        all_version_files = list(bucket.list_blobs(prefix=f'{cve}/'))
        for blob in all_version_files:            
            parts = blob.name.split('/')
            if len(parts) > 2 and parts[1] != 'patch-files':
                version_number = parts[1]
                print(version_number)
                # Optionally, list files under each first-level version_number folder
                folder_prefix = f'{cve}/{version_number}/'
                for blob in bucket.list_blobs(prefix=folder_prefix, delimiter='/'):
                    if blob.name != folder_prefix:  # To avoid printing the folder itself
                        print(f"  File: {blob.name}")
                        target_file = blob.download_as_text()
                        
                        #Step 3: generate patch file for each version
                        
                        steps = extract_codesnippets_from_patch(patch_content)
                        print('-----summary----')
                        print(steps)
                        # steps = trim_content(diff_summary)
                        # print('-----steps----')
                        function_name = extract_function_name(steps)
                        # print(diff_summary)
                        # instructions = convert_steps_to_instruction(steps)
                        # print('-----instructions----')
                        # print(instructions)
                        new_content = generate_patched_file(target_file, steps, function_name)
                        # print('-----result----')
                        print(new_content)
                        file_name = blob.name.split('/')[2] # CVE/version/file_name
                        upload_patch_content(cve, version_number, file_name, new_content)
                        return
                        
                        
                        
        
#         # Create tasks for the agents
#         backporting_patch_task = Task(
#             description=f'Given the CVE id: {cve}, with description: {description}, and I have this patch file: {patch_content}, can you fix the target vulnerable file of this repo with an older version: {file_content}',
#             agent=cve_expert,
#             expected_output='A new file with minimum code change from the target vulnerable file, output in original target file format'
#         )

#         crew = Crew(
#           agents=[senior_security_developer],
#           tasks=[backporting_patch_task],
#           verbose=True,
#           process=Process.sequential
#         )

#         result = crew.kickoff()
        # print(result)
main()


CVE: CVE-2017-16994
what is patch_files, [<Blob: patch-demo-data-folder, CVE-2017-16994/patch-files/373c4557d2aa362702c4c2d41288fb1e54990b7c.patch, 1718350061800224>]
4.5.13-rc
4.8-rc2
  File: CVE-2017-16994/4.8-rc2/pagewalk.c
-----summary----
[
  {
    "old lines": "static int walk_hugetlb_range(unsigned long addr, unsigned long end, struct mm_walk *walk)\n{\n\tunsigned long next;\n\tpte_t *pte;\n\tstruct page *h = walk->hugetlb_entry;\n\n\tif (!h)\n\t\treturn 0;\n\n\tdo {\n\t\tnext = hugetlb_entry_end(h, addr, end);\n\t\tpte = huge_pte_offset(walk->mm, addr & hmask, sz);\n\t\tif (pte && walk->hugetlb_entry)\n\t\t\terr = walk->hugetlb_entry(pte, hmask, addr, next, walk);\n\t} while (addr = next, addr != end);\n\n\treturn err;\n}",
    "new lines": "static int walk_hugetlb_range(unsigned long addr, unsigned long end, struct mm_walk *walk)\n{\n\tunsigned long next;\n\tpte_t *pte;\n\tstruct page *h = walk->hugetlb_entry;\n\n\tif (!h)\n\t\treturn 0;\n\n\tdo {\n\t\tnext = hugetlb_entry_end