In [13]:
import pandas as pd
import sqlite3 as lite
import ast

In [14]:
langs = ['C', 'PHP','C++', 'JavaScript', 'Python', 'Ruby', 'Java', 
 'Shell', 'Go', 'TypeScript', 'Objective-C', 'SQL', 'C#', 'Perl', 
 'Batchfile', 'CoffeeScript', 'Scala', 'PowerShell', 'Haskell', 'Lua',
 'Rust', 'Swift', 'R', 'Matlab', 'Erlang' ]

remove_langs = ['unknown', 'Markdown', 'None', 'HTML', 'TeX', 'CSS', 'Jupyter Notebook']
remove_cwe = ['NVD-CWE-noinfo', 'NVD-CWE-Other']

In [21]:
import ollama

prompt = """
You are a security expert tasked with identifying vulnerabilities in a given code. Carefully analyze the code using CWE (Common Weakness Enumeration) descriptions step by step.

For each step:

1. Analyze the code to check if it contains any vulnerabilities.
    - If a vulnerability is identified, proceed to steps 2–4.
    - If no vulnerabilities are found, return the output as: `{{"vulnerabilities": null}}`.
2. Identify the specific area in the code that could be exploited.
3. Explain why the identified area might be vulnerable, providing a detailed explanation referencing CWE descriptions.
4. Based on the analysis, identify the CWE category and include its ID, name, and description.

Format the output as JSON with the following structure:

```json
{
    "code": "The area in the code that could be exploited.",
    "reason": "Detailed explanation of why the code is vulnerable.",
    "cwe": {
        "id": "CWE-XXX",
        "name": "Name of the CWE category",
        "description": "Brief description of the CWE category."
    }
}
```

If no vulnerabilities are identified, output:
```json
{
    "vulnerabilities": null
}
```
"""

def llm_classify(code):
    context = prompt + '\n Code: \n' + code
    print(context)

#     response = ollama.chat(model='llama3.1:8b', messages=[
#       {
#         'role': 'user',
#         'content': context,
#       },
#     ], options={"temperature":0.9})
#     print(response['message']['content'])

In [16]:
def create_connection(db_file):
    """
    create a connection to sqlite3 database
    """
    conn = None
    try:
        conn = lite.connect(db_file, timeout=10)  # connection via sqlite3
    except Error as e:
        print(e)
    return conn

conn = create_connection('/home/keisuke/code/llm-code-vuln/CVEfixes/CVEfixes_v1.0.8/Data/DB.db')

In [17]:
def pre_processing(df):
    # code diff: both add & del should exist
    df['diff_added'] = df.apply(lambda row: ast.literal_eval(row.diff_parsed)['added'], axis=1)
    df['diff_deleted'] = df.apply(lambda row: ast.literal_eval(row.diff_parsed)['deleted'], axis=1)
    df = df[df['diff_added'].apply(bool) & df['diff_deleted'].apply(bool)] 
    df = df.reset_index(drop=True)
    df = df.drop(columns=['diff_parsed'])
    print(len(df))

    # cve description type str -> arr
    def parse_py_literal(text):
        if not isinstance(text, str):
            return text
        try:
            return ast.literal_eval(text)
        except (SyntaxError, ValueError):
            return None
    df['cve_description'] = df['cve_description'].apply(parse_py_literal)
    print(len(df))
    
    # code before and after
    df = df[df['vuln_code'].notna()]
    df = df[df['vuln_code'] != 'None']
    df = df[df['non_vuln_code'].notna()]
    df = df[df['non_vuln_code'] != 'None']
    
    # remove rows where number of line in the code is below 30
    for col in ['vuln_code', 'non_vuln_code']:
        df[f'{col}_num_lines'] = df[col].apply(
            lambda x: x.count('\n') + 1 if isinstance(x, str) else 0
        )
        df = df[df[f'{col}_num_lines'] >= 30]
    print(len(df))

    # remove empty list in diff_deleted
    df = df[df['diff_deleted'].apply(lambda x: isinstance(x, list) and len(x) > 0)]
    print(len(df))

    # token_count should be num
    df['token_count'] = pd.to_numeric(df['token_count'])
    
    # drop the other CWE
    df = df[~df["cwe_id"].isin(remove_cwe)]
    
    df = df.dropna()
    
    return df

In [18]:
for lang in langs:
    print(f'Language:{lang}')
    query = f"""
SELECT
    file_change.programming_language,
    cwe.cwe_id,
    cwe.cwe_name,
    file_change.code_after AS non_vuln_code,
    file_change.code_before AS vuln_code,
    cwe.description AS cwe_description,
    file_change.diff_parsed,
    cve.description AS cve_description,
    method_change.code    AS method_code,
    method_change.name    AS method_name,
    file_change.token_count
FROM file_change
    INNER JOIN method_change
        ON file_change.file_change_id = method_change.file_change_id
    INNER JOIN fixes
        ON file_change.hash = fixes.hash
    INNER JOIN cve
        ON fixes.cve_id = cve.cve_id
    INNER JOIN cwe_classification
        ON cve.cve_id = cwe_classification.cve_id
    INNER JOIN cwe
        ON cwe_classification.cwe_id = cwe.cwe_id
WHERE
    file_change.programming_language = '{lang}'
    AND file_change.programming_language IS NOT NULL
    AND cwe.cwe_id IS NOT NULL
    AND cwe.cwe_name IS NOT NULL
    AND file_change.code_before IS NOT NULL
    AND cwe.description IS NOT NULL
    AND file_change.diff_parsed IS NOT NULL
    AND cve.description IS NOT NULL
    AND method_change.code IS NOT NULL
    AND method_change.name IS NOT NULL;
        """
    df = pd.read_sql(query, con=conn)
    if len(df) <= 100: 
        print(f'{lang} will be skipped')
        break
    
    df = pre_processing(df)
    
    sample_size = 2
    for code_type in ['vuln_code', 'non_vuln_code']: 
        for token_length in [(0,500), (500,1000), (1000,5000), (5000,10000)]:
            filtered_df = df[(df['token_count'] >= token_length[0]) & 
                             (df['token_count'] <= token_length[1])]

            random_samples = filtered_df.sample(n=sample_size)
            if len(random_samples) != 2:
                print('Not enough samples')
                break
            
            for s in range(sample_size):
                llm_classify(random_samples.iloc[s][code_type])
                break
            break
    break

Language:C
15321
15321
15312
15312

    You are a security expert tasked with identifying vulnerabilities in a given code. Carefully analyze the code using CWE (Common Weakness Enumeration) descriptions step by step.

    For each step:

    1. Analyze the code to check if it contains any vulnerabilities.
        - If a vulnerability is identified, proceed to steps 2–4.
        - If no vulnerabilities are found, return the output as: `{{"vulnerabilities": null}}`.
    2. Identify the specific area in the code that could be exploited.
    3. Explain why the identified area might be vulnerable, providing a detailed explanation referencing CWE descriptions.
    4. Based on the analysis, identify the CWE category and include its ID, name, and description.

    Format the output as JSON with the following structure:

    ```json
    {
        "code": "The area in the code that could be exploited.",
        "reason": "Detailed explanation of why the code is vulnerable.",
        "cwe": {
   

In [22]:
sample_size = 2
for code_type in ['vuln_code', 'non_vuln_code']: 
    for token_length in [(0,500), (500,1000), (1000,5000), (5000,10000)]:
        filtered_df = df[(df['token_count'] >= token_length[0]) & 
                         (df['token_count'] <= token_length[1])]

        random_samples = filtered_df.sample(n=sample_size)
        if len(random_samples) != 2:
            print('Not enough samples')
            break

        for s in range(sample_size):
            llm_classify(random_samples.iloc[s][code_type])
            break
        break


You are a security expert tasked with identifying vulnerabilities in a given code. Carefully analyze the code using CWE (Common Weakness Enumeration) descriptions step by step.

For each step:

1. Analyze the code to check if it contains any vulnerabilities.
    - If a vulnerability is identified, proceed to steps 2–4.
    - If no vulnerabilities are found, return the output as: `{{"vulnerabilities": null}}`.
2. Identify the specific area in the code that could be exploited.
3. Explain why the identified area might be vulnerable, providing a detailed explanation referencing CWE descriptions.
4. Based on the analysis, identify the CWE category and include its ID, name, and description.

Format the output as JSON with the following structure:

```json
{
    "code": "The area in the code that could be exploited.",
    "reason": "Detailed explanation of why the code is vulnerable.",
    "cwe": {
        "id": "CWE-XXX",
        "name": "Name of the CWE category",
        "description": "B