In [1]:
from edt_utils import get_rectangular_contours, py_blockproc, display_segments, detect_ref_pulse, print_line_dict,segment_to_df, remove_text
from edt_utils import process_line,get_values_from_img,measure_extract_pulse 
from matplotlib import pyplot as plt
from scipy.signal import find_peaks
from ss import pattern_match
from scipy import ndimage
from PIL import Image

In [2]:
import skimage as ski
import pandas as pd
import numpy as np
import cv2 as cv
import pytesseract
import operator
import scipy
import sys

In [3]:
image_name = 'images/ecg_test.png'
template_name = 'images/pul.png'
csv_name = 'ecg_csv.csv'

In [4]:
BORDER_GAP = 2 # gap around the border
layout = (3,4)

# indicates if there is a pulse
# pulse = 0 - pulse in all rows
# pulse = line_list - specifies which rows contain pulse signals 
pulse = -1   

# which line has the rhythm signal
rhythm = 4 

verbose = 0

mmpsec = 25 # 25 mm/seg
mmpmv = 10 # 10 mm/mV

pulse_width_mm = 5 # pulse width in mm
pulse_height_mm =10  # pulse height in mm
pulse_per_sec = pulse_width_mm / mmpsec
pulse_per_mv= pulse_height_mm / mmpmv
sample_frequency = 500
time_lead = 2.5 # duratiom of the segment in seconds
num_sampling_points = time_lead * sample_frequency

In [5]:
# lead signals layout
if layout[1]== 4 and layout[0]==3:
    lt_leads = [['I', 'aVR','V1','V4'],
                ['II','aVL','V2','V5'],
                ['III','aVF', 'V3','V6'],
                ['II']]
elif layout[1]==2:
    raise NotImplementedError ('Not implemented' )
elif layout[1]==1:
    raise NotImplementedError ('Not implemented' )
else:
    raise ValueError('columns must be 4, 2 or 1')

In [None]:
# Define pulse detection
if pulse == 0 :
    print("INFO: No pulse to be detected")
elif pulse == -1:
    print("INFO: pulse to be detected in all lines")
elif  isinstance(pulse, list): 
    print("INFO: pulse on lines: {}.".format(pulse)) 
    for p in pulse:
        lt_leads[p].append('Pulse')
elif isinstance(pulse, int): 
    print("INFO: pulse on line: {}.".format(pulse)) 
    lt_leads[pulse].append('Pulse')
else:
    raise ValueError('pulse should  be 0, an int or a list')

In [None]:
# Define rhythm
if rhythm == 0:
    print("INFO: No rhythm lead") 
else:
    print("INFO: Rhythm lead in row #{}.".format(rhythm)) 

In [8]:
config_dict = {}
config_dict['layout'] = layout   # tuple with the layout
config_dict['rhythm'] = rhythm # which row has the rhythm signal
config_dict['verbose'] = verbose # 
config_dict['pulse'] = pulse # which lines have pulse

config_dict['pulse_width_mm']  = pulse_width_mm
config_dict['pulse_height_mm'] = pulse_height_mm
config_dict['pulse_per_mv'] = pulse_per_mv
config_dict['pulse_per_sec'] = pulse_per_sec
config_dict['num_sampling_points'] = num_sampling_points

In [9]:
#load the image 
#image_name = 'images/ecg_test.png'  # select image
image = cv.imread(image_name)

# sanity check
if image is None:
    print('Cannot open image: ' + image_name)
    sys.exit(0)
if verbose > 2:
    print("INFO: Image Shape {}.".format(image.shape))
    plt.imshow(image, cmap="gray")
    plt.show()

In [10]:
#Filter color to remove the grid
lower = (0, 0, 0) # black color
upper = (100, 100, 100) # dark gray
mask = cv.inRange(image, lower, upper)
result = image.copy()
result[mask != 255] = (255, 255, 255) # if it is not very dark set it to white

#Convert to gray scale
image_gray = cv.cvtColor(result, cv.COLOR_BGR2GRAY )

if verbose > 2:
    print("INFO: gray scale image Shape {}.".format(image_gray.shape))
    plt.imshow(image_gray, cmap="gray")
    plt.show()

In [11]:
# use thresholding to transform the image into a binary one
ret, th1 = cv.threshold(image_gray, 127, 255, cv.THRESH_OTSU)

if verbose > 2:
    print("INFO: Binary image Shape {}.".format(th1.shape))
    plt.imshow(th1, cmap="gray")
    plt.show()

In [12]:
foreground  = 255 - th1
contours, _ = cv.findContours(foreground, cv.RETR_LIST, cv.CHAIN_APPROX_SIMPLE)
rectangular_contours = get_rectangular_contours(contours)

if verbose > 1:
    plt.imshow(foreground, cmap="gray")
    plt.show()

In [13]:
contour_image = image_gray.copy()

# find the biggest countour (c) by the area
c = max(contours, key = cv.contourArea)
x_border, y_border, w_border, h_border = cv.boundingRect(c)
# draw the biggest contour (c) in green
cv.rectangle(contour_image, (x_border,y_border), (x_border + w_border, y_border + h_border), (0, 255, 0), 10)

if verbose > 1:
    plt.imshow(contour_image, cmap="gray")
    #TODO: add title
    plt.show()

In [14]:
# ECG image extracted from the main image
foreground  = 255 - th1[y_border+BORDER_GAP : y_border+h_border-BORDER_GAP,
                        x_border+BORDER_GAP : x_border+w_border-BORDER_GAP]
if verbose > 1:
    plt.imshow(foreground, cmap = "gray")
    plt.show()

In [15]:
#template_name = 'images/pul.png'
template = cv.imread(template_name, cv.IMREAD_GRAYSCALE)

# sanity check
if image is None:
    print('Cannot open the template: ' + template_name)
    new_template = None
else:
    #load template to find the pulse
    _, new_template = cv.threshold(template, 127, 255, cv.THRESH_OTSU)
    new_template = (new_template != 255) * np.uint8(255)
    
if verbose > 1:
    plt.imshow(new_template, cmap = "gray")
    plt.show()

In [16]:
# Extract the individual leads (lines)
temp = py_blockproc(foreground, (1,foreground.shape[1]), func=0)
median_temp = np.median(temp.flatten())
peak_indices, peak_dict = find_peaks(temp.flatten(), height=median_temp, distance=20)
peak_heights = peak_dict['peak_heights']

highest_peak_index = peak_indices[np.argsort(peak_heights)]

if verbose > 0 :
    plt.plot(temp.flatten())
    # get the leads and the rhythm
    plt.plot(highest_peak_index[-(layout[0]+1):], temp[highest_peak_index[-(layout[0]+1):]], "x")
    plt.plot(median_temp * np.ones_like(temp), "--", color="gray")
    plt.show()

In [17]:
# Calculate the distance between selected peaks
ordered_hp_index = sorted(highest_peak_index[-(layout[0]+1):])

peak_dist = [np.abs(t - s) for s, t in zip(ordered_hp_index, ordered_hp_index[1:])]
max_dist = max(peak_dist) * 7 // 10 #TODO: add as input

# Cut the image according to the number of rows in the layout
# slices_x is a list of tuples
slices_x = [(max(0, s-max_dist), min(foreground.shape[0], s+max_dist),None) for s in ordered_hp_index]
slices_y = [(0, foreground.shape[1], None) for s in ordered_hp_index]

if verbose > 1 :
    print("INFO: slices: {}". format(slices_x))

In [None]:
# Create a list to store the processed lines
proc_line_list = []

h, w = foreground.shape
blank_image = np.zeros(shape=(h, w), dtype=np.uint8)

structure = np.array([[1, 1, 1],
                      [1, 1, 1],
                      [1, 1, 1]], np.uint8)

# Extract and process the leads row-wise.
for i, slx in enumerate(slices_x): 
    line = foreground[slice(*slx), slice(*(0, foreground.shape[1], None))]
    offset = slx # reference to locate the segment in the image
    plt.imshow(line, cmap="gray")
    plt.show()

    labeled_line, nb = ndimage.label(line, structure=structure)

    if verbose > 1:
        print("INFO: Number of segments {} on line {}.".format(nb, i))
        display_segments('Labeled line', labeled_line)

    if (pulse == -1) or (i in pulse) :   # Check if the pulse is present
        line_signal = (labeled_line != 0) * np.uint8(255)
        #line_signal = np.where(labeled_line == 0, 0, 255)

        # plt.imshow(line_signal, cmap = "gray")

        #Try to detect the pulse
        line_copy = line_signal.copy()
        #line_copy = line_copy.astype("uint8")

        template_width, template_height = template.shape
        line_copy_width, line_copy_height = line_copy.shape
        _, _, xt, yt = get_values_from_img(new_template)
        wt, ht = measure_extract_pulse(xt, yt, verbose=0)
        config_dict['hpulse'] = ht #default values
        config_dict['wpulse'] = wt

        # Pulse detection by template
        detected, location, similarity_value, x, y, wpulse, hpulse = detect_ref_pulse(line_copy, new_template)
        print("INFO: line {}: best similarity value = {} in {}".format(i, similarity_value, y))

        # if verbose > 1:
        #     if detected:
        #         print('INFO: pulse detected by template in line {} in {}'.format(i, y))
        #         plt.imshow(line_copy[x:x+hpulse+1, y:y+wpulse+1], cmap ="gray")
        #         plt.show()
        #     else:
        #         print('INFO: pulse NOT detected by template in line {}'.format(i))
    else:
        print("INFO: line {} has no pulse to detect".format(i))
        wpulse = np.nan
        hpulse = np.nan

    # TODO: add info in config_dict  
    config_dict['wpulse'] = wpulse
    config_dict['hpulse'] = hpulse

    # Process line
    line_dict = process_line(i, labeled_line, offset, lt_leads[i], config_dict, config_dict['verbose'])
    proc_line_list.append(line_dict)

In [None]:
#TODO remove the rhythm form the list of lines
if config_dict['rhythm'] != 0:
    config_dict['rhythm'] = 0
    proc_line_list.pop(rhythm - 1)

# convert do a dataframe
ecg_df= segment_to_df(proc_line_list, pulse_per_sec, pulse_per_mv, num_sampling_points)
ecg_df.to_csv(csv_name)

t = np.linspace(0.0, 2.5, len(ecg_df))

for k in range(ecg_df.shape[1]):
    fig, ax = plt.subplots(figsize=(15,5))
    ax.plot(t, ecg_df.iloc[:,k], label=ecg_df.columns[k])
    ax.legend(loc="upper left")
    plt.suptitle(ecg_df.columns[k])
    plt.xticks(np.linspace(t.min(), t.max(), 26))
    plt.yticks(np.linspace(ecg_df.iloc[:,k].min(), ecg_df.iloc[:,k].max(), 10))
    plt.show()

df = ecg_df