### CPG

In [1]:
import os
import re
import time
import html
import pydot
import json
import shutil
import subprocess
import numpy as np
import pandas as pd

In [2]:
train_data = pd.read_csv("D:\\iSE_vulCode\\data\\raw\\train.csv")
PATH_CODE = "data/code/"
PATH_CPG = "data/cpg/"
PATH_DOT = "data/dot/"
PATH_JSON = "data/json/"

In [3]:
def clean_code(code):
    code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL)
    code = re.sub(r'//.*', '', code)
    code = re.sub(r'\n\s*\n', '\n', code).strip()
    code = re.sub(r'\s+\(', '(', code)
    return code

In [4]:
def to_C_files(code, index, out_path):
    if not os.path.exists(out_path):
        os.makedirs(out_path)

    file_name = f"{index}.c"
    with open(out_path + file_name, 'w') as f:
        f.write(code)

def to_DOT_files(code, index, out_path):
    if not os.path.exists(out_path):
        os.makedirs(out_path)

    file_name = f"{index}.dot"
    with open(out_path + file_name, 'w') as f:
        f.write(code)

In [5]:
def joern_parse(input_path, output_path, file_name):
    out_file = file_name + ".bin"
    joern_parse_call = subprocess.run(["joern-parse.bat", input_path, "--output", output_path + out_file],
                                      stdout=subprocess.PIPE, text=True, check=True)
def joern_export(cpg_file_path, output_path, output_format="dot"):
    output_folder = output_path
    joern_export_call = subprocess.run(
        ["joern-export.bat", cpg_file_path, "-o", output_folder, "--format", output_format],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
    )

In [6]:
train_codes = []
train_labels = []

for i in range(len(train_data)):
    code = train_data.loc[i, 'code']
    code = clean_code(code)
    train_codes.append(code)
    label = train_data.loc[i, 'Label']
    train_labels.append(label)

In [26]:
def find_line_number(string):
    match = re.search(r'<SUB>(\d+)</SUB>', string)
    if match:
        return int(match.group(1))
    return None
find_line_number("<(FIELD_IDENTIFIER,flags,flags)<SUB>19</SUB>>")

19

In [27]:
def process_dot(graph, function_name, file_name):
    function = {
        "function": html.unescape(function_name),
        "AST": [],
        "CFG": [],
        "PDG": []
    }

    nodes = graph.get_nodes()
    edges = graph.get_edges()

    edge_dict = {}
    for edge in edges:
        src_id = edge.get_source().strip('"')
        dst_id = edge.get_destination().strip('"')
        label_code = edge.get_label().strip('"').split(': ') if edge.get_label() else ["None"]
        label = html.unescape(label_code[0])
        code = html.unescape(label_code[1]) if len(label_code) > 1 and label_code[1].strip() else "None"

        if src_id not in edge_dict:
            edge_dict[src_id] = []
        if dst_id not in edge_dict:
            edge_dict[dst_id] = []
        
        edge_dict[src_id].append({
            "in": src_id,
            "out": dst_id,
            "label": label,
            "code": code
        })

        edge_dict[dst_id].append({
            "in": src_id,
            "out": dst_id,
            "label": label,
            "code": code
        })


    for node in nodes:
        node_id = node.get_name().strip('"')
        node_label = node.get_label().strip('<>').strip('"') if node.get_label() else "None"
        line_number = find_line_number(node.get_label())
        attribute_node = re.sub(r"<SUB>.*", "", node_label)
        attribute_node = html.unescape(attribute_node)
        attribute_node = attribute_node[1:-1]

        node_type = 'None'
        node_code = 'None'
        pos = 0
        if attribute_node.startswith('<'):
            for i in range(len(attribute_node)):
                pos += 1
                if attribute_node[i] == ',':
                    break
            node_type = attribute_node[0:pos-1]
            node_code = attribute_node[pos:]
        else:
            phan = attribute_node.split(',')
            if len(phan) == 2:
                node_type = phan[0]
                node_code = phan[1]
            elif len(phan) > 2 and attribute_node[0] >= 'a' and attribute_node[0] <= 'z':
                node_type = phan[0]
                node_code = ','.join(phan[1:])

            else :
                s1_list = []
                s2_list = []
                s2_found = False
                if phan[0] == 'IDENTIFIER' or phan[0] == 'FIELD_IDENTIDIER' or phan[0] == 'BLOCK' or phan[0] == 'RETURN' or phan[0] == 'LITERAL':
                    node_type = phan[0] + ',' + phan[1]
                    node_code = ','.join(phan[2:])
                else:
                    for p in phan:
                        if not s2_found and (p.isupper() or all(c.isupper() or c in ['_'] for c in p)):
                            s1_list.append(p)
                        else:
                            s2_found = True
                            s2_list.append(p)
                    
                    node_type = ','.join(s1_list)
                    node_code = ','.join(s2_list).strip()
            if node_type == 'CONTROL_STRUCTURE,IF' or node_type == 'CONTROL_STRUCTURE,FOR':
                node_type = node_type.replace(',', ' ')

        node_data = {
            "id": node_id,
            "edges": [],
            "type": node_type,
            "code": node_code,
            "line_number": line_number
        }

        # Lấy các cạnh liên quan
        related_edges = edge_dict.get(node_id, [])
        for edge in related_edges:
            edge_data = {
                "in": edge["in"],
                "out": edge["out"],
                "label": edge["label"],
                "code": edge["code"]
            }
            node_data["edges"].append(edge_data)

        # Phân loại node vào AST, CFG, hoặc PDG
        is_ast = any(edge["label"].startswith("AST") for edge in related_edges)
        is_cfg = any(edge["label"].startswith("CFG") for edge in related_edges)
        is_pdg = any(edge["label"].startswith("DDG") for edge in related_edges)

        if is_ast:
            function["AST"].append(node_data)
        if is_cfg:
            function["CFG"].append(node_data)
        if is_pdg:
            function["PDG"].append(node_data)

    return function

In [20]:
could_be_error_file = []

In [28]:
def code2cpg(index_code):
    to_C_files(train_codes[index_code], index_code , PATH_CODE)
    if os.path.exists(PATH_CODE) and os.path.isdir(PATH_CODE):
        joern_parse(PATH_CODE, PATH_CPG, f"{index_code}_cpg")

def cpg2dot(cpg_file_path):
    joern_export(cpg_file_path, PATH_DOT)

def dot2json(input_folder, index, output_folder):
    output_json_path = f'{output_folder}{index}.json'

    json_output = {
        "functions": []
    }
    dot_files = [f for f in os.listdir(input_folder) if f.lower().endswith('.dot')]
    if len(dot_files) <= 3:
        could_be_error_file.append(index)
        return
    for dot_file in dot_files:
        file_path = os.path.join(input_folder, dot_file)
        try:
            graphs = pydot.graph_from_dot_file(file_path)
            if not graphs:
                print(f"Không thể đọc đồ thị từ file .dot: {file_path}. Bỏ qua tệp này.")
                continue

            graph = graphs[0]
            subgraphs = graph.get_subgraphs()

            # Nếu có subgraphs, mỗi subgraph là một function
            if subgraphs:
                for subgraph in subgraphs:
                    function_label = subgraph.get_label().strip('"') if subgraph.get_label() else "unknown_function"
                    function = process_dot(subgraph, function_label, f"{index}.c")
                    json_output["functions"].append(function)
            else:
                # Nếu không có subgraphs, coi toàn bộ đồ thị là một function
                function_name = graph.get_name().strip('"') or "unknown_function"
                function = process_dot(graph, function_name, f"{index}.c")
                json_output["functions"].append(function)

        except Exception as e:
            print(f"Đã xảy ra lỗi khi xử lý file {file_path}: {e}")

    # Lưu kết quả ra tệp JSON
    if json_output["functions"]:
        try:
            with open(output_json_path, 'w', encoding='utf-8') as f:
                json.dump(json_output, f, ensure_ascii=False, indent=2)
            print(f"Chuyển đổi thành công! Kết quả đã được lưu vào '{output_json_path}'.")
        except Exception as e:
            print(f"Đã xảy ra lỗi khi ghi tệp JSON: {e}")
    else:
        print("Không tìm thấy functions trong bất kỳ tệp .dot nào.")


In [29]:
i = 0
code2cpg(i)
cpg2dot(f"{PATH_CPG}{i}_cpg.bin")
dot2json(PATH_DOT, i, PATH_JSON)
time.sleep(0.1)
shutil.rmtree(PATH_CODE)
#shutil.rmtree(PATH_DOT)

Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/0.json'.


In [30]:
for i in range(len(train_codes)):
    if i == 225:
        continue
    if i == 1000:
        break
    code2cpg(i)
    cpg2dot(f"{PATH_CPG}{i}_cpg.bin")
    dot2json(PATH_DOT, i, PATH_JSON)
    time.sleep(0.1)
    shutil.rmtree(PATH_CODE)
    shutil.rmtree(PATH_DOT)

Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/0.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/1.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/3.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/4.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/5.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/6.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/7.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/8.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/9.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/10.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/11.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/12.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/13.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json/14.json'.
Chuyển đổi thành công! Kết quả đã được lưu vào 'data/json

### Embedding