In [1]:
## load all the problems from the verilog-eval dataset

from verilog_eval.data import write_jsonl, read_problems, stream_jsonl
from tqdm import tqdm
import itertools
from verilog_eval.data import VERILOG_EVAL_HUMAN, HUMAN_DESCRIPTIONS

problems = read_problems(VERILOG_EVAL_HUMAN)
descriptions = read_problems(HUMAN_DESCRIPTIONS)
for task_id, item in descriptions.items():
    problems[task_id]['description'] = item['detail_description']

# machine_problems = read_problems("../ext/verilog-eval/data/VerilogEval_Machine.jsonl")
# machine_descriptions = read_problems("../ext/verilog-eval/descriptions/VerilogDescription_Machine.jsonl")

## truncate the problems
# problems = itertools.islice(problems.items(), 0, 10)
# problems = dict(problems)
# descriptions = {k: v for k, v in descriptions.items() if k in problems}
# write_jsonl("sample_problems.jsonl", list(problems.values()))

# problems = list(problems.values())

In [2]:
problems

{'gatesv': {'task_id': 'gatesv',
  'prompt': 'module top_module (\n\tinput [3:0] in,\n\toutput [2:0] out_both,\n\toutput [3:1] out_any,\n\toutput [3:0] out_different\n);\n',
  'canonical_solution': '\n\tassign out_both = in[2:0] & in[3:1];\n\tassign out_any = in[2:0] | in[3:1];\n\tassign out_different = in^{in[0], in[3:1]};\n\t\nendmodule\n',
  'test': '`timescale 1 ps/1 ps\n`define OK 12\n`define INCORRECT 13\nmodule reference_module (\n\tinput [3:0] in,\n\toutput [2:0] out_both,\n\toutput [3:1] out_any,\n\toutput [3:0] out_different\n);\n\n\tassign out_both = in[2:0] & in[3:1];\n\tassign out_any = in[2:0] | in[3:1];\n\tassign out_different = in^{in[0], in[3:1]};\n\t\nendmodule\n\n\nmodule stimulus_gen (\n\tinput clk,\n\tinput tb_match,\n\toutput logic [3:0] in,\n\toutput reg[511:0] wavedrom_title,\n\toutput reg wavedrom_enable\t\n);\n\n\n// Add two ports to module stimulus_gen:\n//    output [511:0] wavedrom_title\n//    output reg wavedrom_enable\n\n\ttask wavedrom_start(input[511:0

In [3]:
from vgen.interpreter import evaluate_code

In [4]:
problems

{'gatesv': {'task_id': 'gatesv',
  'prompt': 'module top_module (\n\tinput [3:0] in,\n\toutput [2:0] out_both,\n\toutput [3:1] out_any,\n\toutput [3:0] out_different\n);\n',
  'canonical_solution': '\n\tassign out_both = in[2:0] & in[3:1];\n\tassign out_any = in[2:0] | in[3:1];\n\tassign out_different = in^{in[0], in[3:1]};\n\t\nendmodule\n',
  'test': '`timescale 1 ps/1 ps\n`define OK 12\n`define INCORRECT 13\nmodule reference_module (\n\tinput [3:0] in,\n\toutput [2:0] out_both,\n\toutput [3:1] out_any,\n\toutput [3:0] out_different\n);\n\n\tassign out_both = in[2:0] & in[3:1];\n\tassign out_any = in[2:0] | in[3:1];\n\tassign out_different = in^{in[0], in[3:1]};\n\t\nendmodule\n\n\nmodule stimulus_gen (\n\tinput clk,\n\tinput tb_match,\n\toutput logic [3:0] in,\n\toutput reg[511:0] wavedrom_title,\n\toutput reg wavedrom_enable\t\n);\n\n\n// Add two ports to module stimulus_gen:\n//    output [511:0] wavedrom_title\n//    output reg wavedrom_enable\n\n\ttask wavedrom_start(input[511:0

In [5]:
task_id = "gatesv"
comp = problems[task_id]["canonical_solution"]

In [6]:
res = evaluate_code(task_id, comp, problems)

In [7]:
res

VerilogExecution(code='\n\tassign out_both = in[2:0] & in[3:1];\n\tassign out_any = in[2:0] | in[3:1];\n\tassign out_different = in^{in[0], in[3:1]};\n\t\nendmodule\n', status=<VerilogStatus.SUCCESS: 'success'>, stdout="VCD info: dumpfile wave.vcd opened for output.\ngatesv.sv:56: $finish called at 1066 (1ps)\nHint: Output 'out_both' has no mismatches.\nHint: Output 'out_any' has no mismatches.\nHint: Output 'out_different' has no mismatches.\nHint: Total mismatched samples is 0 out of 213 samples\n\nSimulation finished at 1066 ps\nMismatches: 0 in 213 samples\n", stderr='', passed=True, result=None, df_err=Empty DataFrame
Columns: [time, in[3:0], out_any_dut[3:1], out_any_ref[3:1], out_both_dut[2:0], out_both_ref[2:0], out_different_dut[3:0], out_different_ref[3:0]]
Index: [])

In [8]:
import outlines

In [9]:
STOP_SEQUENCES = ["```"]

def split_problem_tests(problem):
    pre_base_str, tests = problem["test"].split("def check(candidate):\n")
    base_str = "def check(candidate):\n"
    split_tests = []
    # NOTE: assumes human-eval-specific logic for multiline asserts & for-loops
    # won't work properly if multiline assert nested within for-loop
    multiline_assert, parts = False, []
    for_loop, fl_parts = False, []
    for i in tests.split("\n"):
        if multiline_assert:
            parts.append(i)
            if i.lstrip().startswith("]"):
                test = "\n".join(parts)
                split_tests.append(pre_base_str + base_str + test)
                multiline_assert = False
        elif i.lstrip().startswith("assert") and i.lstrip()[-1] == "[":
            multiline_assert = True
            parts = [i]
        elif for_loop:
            fl_parts.append(i)
            if i.lstrip().startswith("assert"):
                test = "\n".join(fl_parts)
                split_tests.append(pre_base_str + base_str + test)
                for_loop, fl_parts = False, []
        elif (
            (i.lstrip() == "")
            or (i.lstrip().startswith("#"))
            or (i.lstrip().startswith("print"))
        ):
            continue
        elif not (i.lstrip().startswith("assert")):
            fl_parts.append(i)
            if i.lstrip().startswith("for"):
                for_loop = True
        # special logic for HumanEval/151
        elif problem["task_id"] == "HumanEval/151" and (
            i.lstrip().startswith("assert candidate(lst)")
        ):
            fl_parts.append(i)
            test = "\n".join(fl_parts)
            split_tests.append(pre_base_str + base_str + test)
            fl_parts = []
        else:
            split_tests.append(pre_base_str + base_str + i)
    return split_tests


# task_id_problem_map = {problem["task_id"]: problem for problem in problems}
# task_id_split_tests_map = {
    # problem["task_id"]: split_problem_tests(problem) for problem in problems
# }


def stats_execute(task_id, completion, timeout=10):
    problem = task_id_problem_map[task_id]
    split_tests = task_id_split_tests_map[task_id]
    thread_problems = [{**problem, "test": test} for test in split_tests]
    results = []
    with ThreadPoolExecutor() as executor:
        for result in executor.map(
            lambda tp: check_correctness(tp, completion, timeout), thread_problems
        ):
            results.append(result["passed"])

    return {
        "task_id": task_id,
        "pass_rate": sum(results) / len(results),
    }


@outlines.prompt
def few_shot_prompt(instructions, examples, description, question):
    """{{ instructions }}

    {% for example in examples %}
    Description:
    ```
    {{ example.description }}
    ```
    Question:
    ```
    {{ example.prompt }}
    ```
    Answer:
    ```
    {{ example.canonical_solution }}
    ```
    {% endfor %}

    Description:
    ```
    {{ description }}
    ```
    Question:
    ```
    {{ question }}
    ```
    Answer:
    ```
    """


instructions = "Please answer the following question following the examples. Generate valid verilog code always."
examples = list(problems.values())[6:8]


def get_prompts_with_ids():
    prompts_with_ids = [
        (few_shot_prompt(instructions, examples, descriptions[problem["task_id"]]['detail_description'], problem["prompt"]), problem["task_id"])
        for problem in list(problems.values())[:6]
    ]
    return prompts_with_ids


In [10]:
# problems
# descriptions
task_id = "kmap3"
completion = "\n\talways @(*) begin\n\t\tif(!c && !b && a)\n\t\t\tout = 0;\n\t\telse if(!d && c && a)\n\t\t\tout = 1;\n\t\telse if(d && b && a)\n\t\t\tout = 1;\n\t\telse if(d && !b && !a)\n\t\t\tout = 0;\n\t\telse out = d; // d is don't care, so we can output anything here.\n\tend \n\nendmodule\n"
# , "result": "failed: 82 out of 232 samples.", "passed": false}

In [11]:
from interpreter import evaluate_code

In [12]:
problems

{'gatesv': {'task_id': 'gatesv',
  'prompt': 'module top_module (\n\tinput [3:0] in,\n\toutput [2:0] out_both,\n\toutput [3:1] out_any,\n\toutput [3:0] out_different\n);\n',
  'canonical_solution': '\n\tassign out_both = in[2:0] & in[3:1];\n\tassign out_any = in[2:0] | in[3:1];\n\tassign out_different = in^{in[0], in[3:1]};\n\t\nendmodule\n',
  'test': '`timescale 1 ps/1 ps\n`define OK 12\n`define INCORRECT 13\nmodule reference_module (\n\tinput [3:0] in,\n\toutput [2:0] out_both,\n\toutput [3:1] out_any,\n\toutput [3:0] out_different\n);\n\n\tassign out_both = in[2:0] & in[3:1];\n\tassign out_any = in[2:0] | in[3:1];\n\tassign out_different = in^{in[0], in[3:1]};\n\t\nendmodule\n\n\nmodule stimulus_gen (\n\tinput clk,\n\tinput tb_match,\n\toutput logic [3:0] in,\n\toutput reg[511:0] wavedrom_title,\n\toutput reg wavedrom_enable\t\n);\n\n\n// Add two ports to module stimulus_gen:\n//    output [511:0] wavedrom_title\n//    output reg wavedrom_enable\n\n\ttask wavedrom_start(input[511:0

In [13]:
problems[task_id]

{'task_id': 'kmap3',
 'prompt': 'module top_module (\n\tinput a, \n\tinput b,\n\tinput c,\n\tinput d,\n\toutput reg out\n);\n',
 'canonical_solution': "\t\n    always @(*) begin\n        case({a,b,c,d})\n            4'h0: out = 0;\n            4'h1: out = 0;\n            4'h3: out = 1;\n            4'h2: out = 1;\n            4'h4: out = 1'bx;\n            4'h5: out = 0;\n            4'h7: out = 0;\n            4'h6: out = 0;\n            4'hc: out = 1;\n            4'hd: out = 1'bx;\n            4'hf: out = 1;\n            4'he: out = 1;\n            4'h8: out = 1;\n            4'h9: out = 1'bx;\n            4'hb: out = 1;\n            4'ha: out = 1;\n        endcase\n    end\nendmodule\n",
 'test': '`timescale 1 ps/1 ps\n`define OK 12\n`define INCORRECT 13\nmodule reference_module (\n\tinput a, \n\tinput b,\n\tinput c,\n\tinput d,\n\toutput reg out\n);\n\t\n    always @(*) begin\n        case({a,b,c,d})\n            4\'h0: out = 0;\n            4\'h1: out = 0;\n            4\'h3: out

In [22]:
from datetime import datetime
import os
from typing import Optional, Dict, Any
import subprocess
import re

from vgen.utils import process_vcd
import pydantic
from enum import Enum
import outlines


class VerilogStatus(Enum):
    SUCCESS = "success"
    SYNTAX_ERROR = "syntax_error"
    COMPILE_ERROR = "compile_error"
    RUNTIME_ERROR = "runtime_error"
    TIMEOUT = "timeout"
    OTHER = "other"


class VerilogExecution(pydantic.BaseModel):
    """
    A class to represent the execution of a verilog program.
    """

    status: VerilogStatus
    stdout: str
    stderr: str
    passed: bool
    pass_rate: float
    result: Optional[Dict]
    df_err: Optional[Any]
    code: str


@outlines.vectorize
def evaluate_code(task_id, completion, problem) -> VerilogExecution:
    """
    Evaluate the code for a given task_id and completion.
    problem is a dict with many keys including "prompt", "description", "task_id", etc..
    """
    verilog_test = problem["test"] + "\n" + problem["prompt"] + "\n" + completion

    # Create a timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    directory = f"/tmp/{task_id}_{timestamp}"

    # Create the directory
    if not os.path.exists(directory):
        os.makedirs(directory)

    # Write the Verilog test file in the new directory
    file_path = os.path.join(directory, f"{task_id}.sv")
    with open(file_path, "w") as f:
        f.write(verilog_test)

    # Prepare the compilation and simulation commands
    compile_cmd = f"iverilog -Wall -Winfloop -Wno-timescale -g2012 -s tb -o {task_id}.vvp {task_id}.sv"
    simulation_cmd = f"vvp -n {task_id}.vvp"
    full_cmd = f"cd {directory} && {compile_cmd} && {simulation_cmd}"

    try:
        # Execute the commands and capture the output
        process = subprocess.Popen(
            full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        stdout, stderr = process.communicate()

        # Decode the outputs from byte to string
        stdout = stdout.decode("utf-8")
        stderr = stderr.decode("utf-8")

        match = re.search(r"Mismatches: ([0-9]*) in ([0-9]*) samples", stdout)
        if "syntax error" in stderr:
            status = VerilogStatus.SYNTAX_ERROR
        elif len(stderr.strip()) > 0:
            status = VerilogStatus.COMPILE_ERROR
        elif match:
            cor, tot = [int(i) for i in match.groups()]
            if cor == 0:
                status = VerilogStatus.SUCCESS
            else:
                status = VerilogStatus.RUNTIME_ERROR
        else:
            raise ValueError("Unexpected error")

        if os.path.exists(f"{directory}/wave.vcd"):
            df_err = process_vcd(f"{directory}/wave.vcd")
        else:
            df_err = None
            
        if status == VerilogStatus.SUCCESS:
            pass_rate = 1.0
        elif status == VerilogStatus.RUNTIME_ERROR:
            pass_rate = 1.0 - cor / tot
        else:
            pass_rate = 0.0

        out = VerilogExecution(
            status=status,
            stdout=stdout,
            stderr=stderr,
            passed=status == VerilogStatus.SUCCESS,
            pass_rate=pass_rate,
            result=None,
            df_err=df_err,
            code=completion,
        )
        return out
    except Exception as e:
        # Handle exceptions
        raise ValueError(f"Error while executing the code: {e}")


In [23]:
res =evaluate_code(task_id, completion, problems[task_id])

In [None]:
# def stats_execute(task_id, completion, timeout=10):
    # problem = task_id_problem_map[task_id]
    # split_tests = task_id_split_tests_map[task_id]
    # thread_problems = [{**problem, "test": test} for test in split_tests]
    # results = []
    with ThreadPoolExecutor() as executor:
        for result in executor.map(
            lambda tp: check_correctness(tp, completion, timeout), thread_problems
        ):
            results.append(result["passed"])

        return {
            "task_id": task_id,
            "pass_rate": sum(results) / len(results),
        }
