In [1]:
import config
import code_improvement.code_execution as execution

from tqdm import tqdm
from utils import prompt, tools
from langchain.prompts import ChatPromptTemplate

In [None]:
def determine_term_list():
    term_prompt = ChatPromptTemplate.from_messages(
        [("system", "You are a very helpful assistant."), ("user", "{input}")]
    )
    term_chain = term_prompt | config.llm

    response = term_chain.invoke(
        {"input": prompt.concept_determination_prompt.format(config.user_input_ECG)}
    )
    term_list = response.content.split(", ")

    return term_list

In [None]:
def determine_URL_list(term_list):
    URL_list = []
    agent_executor = tools.create_agent(
        have_message_history=False, tool_list=[config.search], message_history=None
    )

    for term in term_list:
        print("Working on term: {} . . .".format(term))

        response = agent_executor.invoke(
            {
                "input": prompt.concept_searching_prompt.format(
                    config.user_input_ECG, term
                )
            }
        )

        URL_list.extend(response["output"].split(", "))

    return URL_list

In [None]:
def generate_high_level_design(agent_executor):
    response = agent_executor.invoke(
        {"input": prompt.high_level_design_prompt.format(config.user_input_ECG)},
        config=config.session_configuration,
    )

    return response["output"]

In [None]:
def generate_detailed_design(agent_executor):
    response = agent_executor.invoke(
        {"input": prompt.detailed_design_prompt.format(config.user_input_ECG)},
        config=config.session_configuration,
    )

    step_list = response["output"].split("\n\n")

    return step_list

In [None]:
def generate_code_segment(agent_executor, step_list):
    code_list = []

    for step_description in step_list:
        print(
            "Now, we are generating code segment for step {}".format(
                step_description.split("\n")[0]
            )
        )

        try:
            response = agent_executor.invoke(
                {
                    "input": prompt.code_segment_prompt.format(
                        config.user_input_ECG,
                        config.programming_language,
                        step_description.split("\n")[0],
                        step_description[step_description.index("\n") + 1 :],
                        config.programming_language,
                        step_description.split("\n")[0],
                    )
                },
                config=config.session_configuration,
            )
        except:
            continue

        code_segment = tools.sanitize_output(response["output"])
        code_list.append(code_segment)

    return code_list

In [None]:
def generate_final_code(agent_executor):
    response = agent_executor.invoke(
        {"input": prompt.code_combination_prompt.format(config.user_input_ECG)},
        config=config.session_configuration,
    )

    return tools.sanitize_output(response["output"])

In [None]:
def execute_code(code, data_path):
    with open("generated_code.py", "w") as file:
        file.write(code)

    command = "python3 generated_code.py -i {}".format(data_path)
    status, program_output = execution.run(command, True)

    return status, program_output

In [None]:
# first determine a list of terms to be serached
term_list = determine_term_list()
print(
    "According to your requirements, we decide to search for the definition of the following terms:",
    term_list,
)

According to your requirements, we decide to search for the definition of the following terms: ['MIT-BIH Arrhythmia Database', 'ECG data', 'R-peaks', 'detection accuracy']


In [12]:
URL_list = determine_URL_list(term_list)
print("Here is the URL list we crawled:")
for URL in URL_list:
    print(URL)

Working on term: MIT-BIH Arrhythmia Database . . .


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `tavily_search_results_json` with `{'query': 'MIT-BIH Arrhythmia Database definition'}`


[0m[36;1m[1;3m[{'url': 'http://academy.beecardia.com/physiobank/database/mitdb', 'content': "Twenty-three recordings were chosen at random from a set of 4000 24-hour ambulatory ECG recordings collected from a mixed population of inpatients (about 60%) and outpatients (about 40%) at Boston's Beth Israel Hospital; the remaining 25 recordings were selected from the same set to include less common but clinically significant arrhythmias that would not be well-represented in a small random sample. MIT-BIH Arrhythmia (48 records)\nThe MIT-BIH Arrhythmia Database contains 48 fully annotated half-hour excerpts of two-channel ambulatory ECG recordings, obtained from 47 subjects studied by the BIH Arrhythmia Laboratory between 1975 and 1979. Several of these records were selected beca

In [None]:
document_list = tools.load_new_webpage(URL_list)
agent_with_chat_history = tools.create_agent(
    have_message_history=True,
    tool_list=[config.search, tools.create_context_retrieval_tool(document_list)],
    message_history=config.message_history,
)

We will further search information from these websites:
http://academy.beecardia.com/physiobank/database/mitdb
https://physionet.org/content/mitdb/1.0.0/
https://archive.physionet.org/physiobank/database/mitdb/
https://en.wikipedia.org/wiki/Electrocardiography
https://link.springer.com/article/10.1007/s11831-023-09916-x
https://www.sciencedirect.com/topics/computer-science/detection-accuracy


In [11]:
high_level_design = generate_high_level_design(agent_with_chat_history)
print("We first provide the algorithm design outline:\n{}".format(high_level_design))



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `tavily_search_results_json` with `{'query': 'algorithm for R-peak detection in ECG using MIT-BIH Arrhythmia Database'}`


[0m[36;1m[1;3m[{'url': 'https://physionet.org/content/mitdb/1.0.0/', 'content': "License (for files):\nOpen Data Commons Attribution License v1.0\nDOI (version 1.0.0):\nhttps://doi.org/10.13026/C2F305\nTopics:\narrhythmia\necg\nFiles\nTotal uncompressed size: 104.3 MB.\n Release Info\nRelated Databases\nAdditional references\nQuestions and Comments\nAccess Policy:\nAnyone can access the files, as long as they conform to the terms of the specified license.\n Database\nOpen Access\nMIT-BIH Arrhythmia Database\nGeorge Moody\n,\nRoger Mark\nPublished: Feb. 24, 2005. Twenty-three recordings were chosen at random from a set of 4000 24-hour ambulatory ECG recordings collected from a mixed population of inpatients (about 60%) and outpatients (about 40%) at Boston's Beth Israel Hospital; the remainin

In [12]:
detailed_design_step_list = generate_detailed_design(agent_with_chat_history)

print("Based on the high level design, we provide the detailed design as:")
for element in detailed_design_step_list:
    print(element)



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `tavily_search_results_json` with `{'query': 'how to read MIT-BIH Arrhythmia Database ECG records'}`


[0m[36;1m[1;3m[{'url': 'http://academy.beecardia.com/physiobank/database/mitdb', 'content': "Twenty-three recordings were chosen at random from a set of 4000 24-hour ambulatory ECG recordings collected from a mixed population of inpatients (about 60%) and outpatients (about 40%) at Boston's Beth Israel Hospital; the remaining 25 recordings were selected from the same set to include less common but clinically significant arrhythmias that would not be well-represented in a small random sample. MIT-BIH Arrhythmia (48 records)\nThe MIT-BIH Arrhythmia Database contains 48 fully annotated half-hour excerpts of two-channel ambulatory ECG recordings, obtained from 47 subjects studied by the BIH Arrhythmia Laboratory between 1975 and 1979. Several of these records were selected because features of the rhythm, QRS morpho

In [24]:
code_segment_list = generate_code_segment(
    agent_with_chat_history, detailed_design_step_list
)

Now, we are generating code segment for step Step 1: [Load ECG Records]


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m```python
import os
import wfdb
import numpy as np

def load_ecg_records(directory_path):
    """
    Load all ECG records from a specified directory containing MIT-BIH Arrhythmia Database files.

    Parameters:
    - directory_path: A string representing the path to the directory containing the ECG records.

    Returns:
    - A dictionary where each key is the record name and the value is a tuple containing the signal array and the corresponding annotation object.

    Explanations of the code:
    - Uses wfdb.rdrecord to read the signal files (.dat) and wfdb.rdann to read the annotation files (.atr).
    - The MIT-BIH database uses a .hea file for header information, which is automatically read by rdrecord.
    - Iterates over all records in the specified directory and loads each one, assuming the directory contains only relevant files.
    - wfdb.rdr

In [26]:
final_code = generate_final_code(agent_with_chat_history)



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m```python
import os
import wfdb
import numpy as np
from scipy.signal import butter, filtfilt, find_peaks, resample
import argparse

def load_ecg_records(directory_path):
    ecg_records = {}
    for filename in os.listdir(directory_path):
        if filename.endswith('.hea'):
            record_name = os.path.splitext(filename)[0]
            record = wfdb.rdrecord(os.path.join(directory_path, record_name))
            annotation = wfdb.rdann(os.path.join(directory_path, record_name), 'atr')
            ecg_signal = record.p_signal[:,0]
            ecg_records[record_name] = (ecg_signal, record.fs, annotation)
    return ecg_records

def preprocess_ecg_signal(ecg_signal, fs, desired_fs=360):
    if fs != desired_fs:
        num_samples = int(len(ecg_signal) * desired_fs / fs)
        ecg_signal = resample(ecg_signal, num_samples)
    lowcut = 0.5
    highcut = 100
    nyquist = 0.5 * desired_fs
    low = lowcut / nyquist
    

In [27]:
dataset_path = input("Please input the path to your dataset: ")
# dataset_path = "/home/iot/Documents/data/ECG/mit-bih-arrhythmia-database-1.0.0"

In [None]:
print("Next, we will proceed to the supervised code generation and modification stage.")

for epoch in range(config.epoch_number):
    status, program_output = execute_code(final_code, dataset_path)
    while status != 0:
        response = agent_with_chat_history.invoke(
            {
                "input": prompt.correct_grammar_prompt.format(
                    config.user_input_ECG, program_output
                )
            },
            config=config.session_configuration,
        )
        final_code = tools.sanitize_output(response["output"])

        status, program_output = execute_code(final_code, dataset_path)

    response = agent_with_chat_history.invoke(
        {
            "input": prompt.improve_code_prompt.format(
                config.user_input_ECG, program_output
            )
        },
        config=config.session_configuration,
    )
    high_level_design = response["output"]
    detailed_design_step_list = generate_detailed_design(agent_with_chat_history)
    code_segment_list = generate_code_segment(
        agent_with_chat_history, detailed_design_step_list
    )
    final_code = generate_final_code(agent_with_chat_history)

Next, we will proceed to the supervised code generation and modification stage.
[1;32m************** START **************[0m
[1;35m('Case 203', 'R-peak indices: [    99    212    321 ... 649781 649873 649977]', 'Detection accuracy: 0.9819819819819819')[0m
[1;35m('Case 124', 'R-peak indices: [    50    159    237 ... 649798 649870 649977]', 'Detection accuracy: 0.9981640146878825')[0m
[1;35m('Case 230', 'R-peak indices: [    81    166    280 ... 649808 649904 649977]', 'Detection accuracy: 0.9768856447688564')[0m
[1;35m('Case 223', 'R-peak indices: [    31    125    199 ... 649709 649850 649977]', 'Detection accuracy: 0.9981082103670071')[0m
[1;35m('Case 220', 'R-peak indices: [    27    120    209 ... 649703 649833 649910]', 'Detection accuracy: 0.9975833736104398')[0m
[1;35m('Case 101', 'R-peak indices: [    88    177    256 ... 649752 649867 649977]', 'Detection accuracy: 0.9989327641408752')[0m
[1;35m('Case 100', 'R-peak indices: [    73    186    285 ... 649830 64990

In [None]:
# search for all the generated comprehensive code
combined_code_list = []

for index in range(0, len(config.message_history.messages), 2):
    question = config.message_history.messages[index].content

    if (
        "Based on the user's problem and the generated code segment for each of the module you designed, please combine all of them and compile a comprehensive and cohesive full version of code with detailed comments."
        in question
    ):
        combined_code = tools.sanitize_output(
            config.message_history.messages[index + 1].content
        )
        combined_code_list.append(combined_code)

In [None]:
program_output_list = []

for index in range(len(combined_code_list)):
    status, program_output = execute_code(combined_code_list[index], dataset_path)
    program_output_list.append(program_output)

[1;32m************** START **************[0m
[1;35m('Case 203', 'R-peak indices: [    99    212    321 ... 649781 649873 649977]', 'Detection accuracy: 0.9819819819819819')[0m
[1;35m('Case 124', 'R-peak indices: [    50    159    237 ... 649798 649870 649977]', 'Detection accuracy: 0.9981640146878825')[0m
[1;35m('Case 230', 'R-peak indices: [    81    166    280 ... 649808 649904 649977]', 'Detection accuracy: 0.9768856447688564')[0m
[1;35m('Case 223', 'R-peak indices: [    31    125    199 ... 649709 649850 649977]', 'Detection accuracy: 0.9981082103670071')[0m
[1;35m('Case 220', 'R-peak indices: [    27    120    209 ... 649703 649833 649910]', 'Detection accuracy: 0.9975833736104398')[0m
[1;35m('Case 101', 'R-peak indices: [    88    177    256 ... 649752 649867 649977]', 'Detection accuracy: 0.9989327641408752')[0m
[1;35m('Case 100', 'R-peak indices: [    73    186    285 ... 649830 649903 649977]', 'Detection accuracy: 0.9995602462620933')[0m
[1;35m('Case 207', 'R

In [None]:
running_result = ""

for index in range(len(program_output_list)):
    running_result = "{}\n\n<version_{}>\n{}\n</version_{}>\n\n".format(
        running_result, index + 1, program_output_list[index], index + 1
    )


response = agent_with_chat_history.invoke(
    {"input": prompt.final_decision_prompt.format(running_result)},
    config=config.session_configuration,
)



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m1[0m

[1m> Finished chain.[0m


In [None]:
version_index = int(response["output"])
final_code = combined_code_list[version_index - 1]

print("Here is the final version of code to solve your problem.")
print("============================================")
print(final_code)
print("============================================")

Here is the final version of code to solve your problem.
import os
import wfdb
import numpy as np
from scipy.signal import butter, filtfilt, find_peaks, resample
import argparse

def load_ecg_records(directory_path):
    ecg_records = {}
    for filename in os.listdir(directory_path):
        if filename.endswith('.hea'):
            record_name = os.path.splitext(filename)[0]
            record = wfdb.rdrecord(os.path.join(directory_path, record_name))
            annotation = wfdb.rdann(os.path.join(directory_path, record_name), 'atr')
            ecg_signal = record.p_signal[:,0]
            ecg_records[record_name] = (ecg_signal, record.fs, annotation)
    return ecg_records

def preprocess_ecg_signal(ecg_signal, fs, desired_fs=360):
    if fs != desired_fs:
        num_samples = int(len(ecg_signal) * desired_fs / fs)
        ecg_signal = resample(ecg_signal, num_samples)
    lowcut = 0.5
    highcut = 100
    nyquist = 0.5 * desired_fs
    low = lowcut / nyquist
    high = highcut

In [None]:
response = agent_with_chat_history.invoke(
    {"input": prompt.user_domentation_prompt.format(config.user_input_ECG, final_code)},
    config=config.session_configuration,
)



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m# User Documentation for ECG R-Peak Detection

## Introduction

### User's Problem
Electrocardiogram (ECG) data is crucial for diagnosing heart-related conditions, and accurate detection of R-peaks in ECG records is a fundamental step in the analysis of heart rhythm. An R-peak represents the point of maximum electrical activity during a heart beat in an ECG trace, corresponding to the ventricular depolarization.

The user needs to process ECG data from the 'MIT-BIH Arrhythmia Database,' with the goal of detecting all R-peaks and evaluating the detection accuracy for each individual record.

### Code Overview
The provided code is a Python script that automates the process of loading, processing, detecting R-peaks, and evaluating the accuracy of the detection against the annotated peaks in each ECG record from the MIT-BIH Arrhythmia Database. The script includes functions for reading and preprocessing the ECG signals, detecting