Импортируем все, что понадобится

In [123]:
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

import pytesseract

# import some common libraries
import numpy as np
import os, json, cv2, random, pickle, glob
from collections import defaultdict

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.utils.visualizer import Visualizer
from detectron2.utils.visualizer import ColorMode

import pandas as pd
from tqdm.notebook import tqdm

Кучка необходимых параметров

In [165]:
# params
# тут можно поскладировать распознанные картинки
dir_for_recognized_bounds = 'bounds_recognition_res'
# тут будут складироваться картинки графа + обрезанных нод/ребер + распознанный текст + результат в виде списка смежности
dir_for_recognized_graphs = 'graph_recognition_res'

# тут должны лежать графы в приличном виде, на которых погоняем алгоритм
validation_data_dir = 'graph_img_val'
# тяжело, но хардкод до .exe 
path_to_tesseract_exe = os.path.join('D:\Soft\Anaconda\Library', 'bin', 'tesseract.exe')
pytesseract.pytesseract.tesseract_cmd = path_to_tesseract_exe

# улучшалка для работы tesseract (распознавалка текстовых меток)
custom_config = r'--oem 3 --psm 6 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ'

# Флаги, чтобы рисовать всякое ненужное, появляющееся в процессе
draw_images_mode = False
write_recognized_text = False

In [16]:
# Здесь читаем все .json в директории с данными = графы, которые можно будет распознавать.
# Только для валидационных/тестовых данных
def get_dict_list(dir_name: str):
    file_list = glob.glob(os.path.join(dir_name, '*.json'))
    
    dict_list = []
    for file in file_list:
        with open(file) as f:
            json_file = json.load(f)
            
            filename = json_file['filename'].split("\\")[-1]
            json_file['filename'] = os.path.join(dir_name, filename)
            json_file['file_name'] = json_file.pop('filename')
            
            dict_list.append(json_file)
            
    return dict_list

In [148]:
from detectron2.engine import DefaultTrainer

# делаем конфиг
cfg = get_cfg()
# Дефолтный конфиг
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml"))

# Отдаем веса дообученной модели
cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")
# Чтобы не ругался на отсутствие cuda
cfg.MODEL.DEVICE='cpu'
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 3  # edge type 1, edge type 2 and node
# Порог, по которому отбираются распознанные объекты
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5

# Моделька-предсказатель
predictor = DefaultPredictor(cfg)

[32m[12/23 16:31:55 d2.checkpoint.c2_model_loading]: [0mFollowing weights matched with model:
| Names in Model                                  | Names in Checkpoint                                                                                  | Shapes                                          |
|:------------------------------------------------|:-----------------------------------------------------------------------------------------------------|:------------------------------------------------|
| backbone.bottom_up.res2.0.conv1.*               | backbone.bottom_up.res2.0.conv1.{norm.bias,norm.running_mean,norm.running_var,norm.weight,weight}    | (64,) (64,) (64,) (64,) (64,64,1,1)             |
| backbone.bottom_up.res2.0.conv2.*               | backbone.bottom_up.res2.0.conv2.{norm.bias,norm.running_mean,norm.running_var,norm.weight,weight}    | (64,) (64,) (64,) (64,) (64,64,3,3)             |
| backbone.bottom_up.res2.0.conv3.*               | backbone.bottom_up.res2.0.conv3.

### Boundbox Recognition Visualization 

In [156]:
# Примеры можно поглядеть в директории из параметра dir_for_recognized_bounds

dataset_dicts = get_dict_list(validation_data_dir)

# Берем случайных ребят из сета
for d in random.sample(dataset_dicts, 20):  
    
    # Достаем название файла с изображением графа
    im = cv2.imread(d["file_name"])
    
    # Ищем объекты
    outputs = predictor(im)
    
    # Это для картинки с распознанными баундами
    v = Visualizer(im[:, :, ::-1], scale=0.5)
    out = v.draw_instance_predictions(outputs["instances"].to("cpu"))
    
    img_name = d['file_name'].split('\\')[-1]
    
    # Рисуем, если надо, исходную картинку и картинку с распознанными обхектами
    if draw_images_mode:
        cv2.imwrite(os.path.join(dir_for_recognized_bounds, img_name), im)
        cv2.imwrite(os.path.join(dir_for_recognized_bounds, img_name), out.get_image())
    
    # Можно вывести классы распознанных объектов и координаты их баундбоксов
    # print(outputs["instances"].pred_classes)
    # print(outputs["instances"].pred_boxes)

#### Общие проблемы:
- Не распознается буква I - выбран не очень хороший шрифт для генерации (((
- Не всегда хорошо ребра распознаются + типы
- Стоит использовать разный порог для нод и ребер, тк ноды распознаются очень хорошо почти всегда

### Nodes Text Detection and Graph 

In [73]:
# функция для нахождения нод, которые связывает ребро
def get_edge_nodes(cropped, bound_int, cnt, nodes_dict, graph_data):
    edge_on_main = edge_on_main_diag(cropped, cnt, graph_data)  # main - слева-направо и сверху-вниз, углы нужны те, которые в баунде
    
    if not edge_on_main:
        shift_x = int(abs(bound_int[2] - bound_int[0]))
        shift_y = int(abs(bound_int[3] - bound_int[1]))
        
        x_1, y_1, x_2, y_2 = bound_int[0] + shift_x, bound_int[1], bound_int[2] - shift_x, bound_int[3]
    else:
        x_1, y_1, x_2, y_2 = bound_int[0], bound_int[1], bound_int[2], bound_int[3]
    
    # расстояние для x1, y1 баунда ребра до ближайшей ноды
    min_dist_1 = 1e10
    closest_node_1 = 0
    # расстояние для x2, y2 баунда ребра до ближайшей ноды
    min_dist_2 = 1e10
    closes_node_2 = 0
    
    for node in nodes_dict:
        node_center_x = nodes_dict[node]['center_x']
        node_center_y = nodes_dict[node]['center_y']
        
        dist_1 = np.sqrt((node_center_x - x_1)**2 + (node_center_y - y_1)**2)
        dist_2 = np.sqrt((node_center_x - x_2)**2 + (node_center_y - y_2)**2)
        
        if dist_1 < min_dist_1:
            min_dist_1 = dist_1
            closest_node_1 = node
        if dist_2 < min_dist_2:
            min_dist_2 = dist_2
            closest_node_2 = node
            
    return closest_node_1, closest_node_2
        
    
# Функция для проверки, на какой из диагоналей лежит ребро
def edge_on_main_diag(cropped, cnt, graph_data):
    
    len_x = len(cropped[0])
    len_y = len(cropped)
    
    k = len_y / len_x
    
    main_intensity = 0
    sub_intensity = 0

    shift = 30
    
    cropped_img = cropped.copy()
    cropped_img_2 = cropped.copy()
    for i in range(0, len_y):
        x_diag = int(i / k)
        
        for j in range(x_diag - shift, x_diag + shift):
            if 0 <= j < len_x:
                # cnt_main += 1
                main_intensity += sum(cropped[i][j])
                cropped_img[i][j] = (255, 0, 0)

        for j in range(len_x - x_diag - shift, len_x - x_diag + shift):
            if 0 <= j < len_x:
                # cnt_sub += 1
                sub_intensity += sum(cropped[i][j])
                cropped_img_2[i][j] = (255, 0, 0)
                
    if draw_images_mode:
        img_name_wo_ext = get_img_name(graph_data)
        
        cv2.imwrite(os.path.join(dir_for_recognized_graphs, 
                                 img_name_wo_ext + '_cropped_edge' + str(cnt) + '_main_colored.png'), cropped_img)
        cv2.imwrite(os.path.join(dir_for_recognized_graphs, 
                                 img_name_wo_ext + '_cropped_edge' + str(cnt) + '_sub_colored.png'), cropped_img_2)
    
    return main_intensity < sub_intensity


# Функция для вытаскивания названия картинки без расширения
def get_img_name(graph_data):
    img_name = graph_data['file_name'].split('\\')[-1]
    img_name_wo_ext = img_name.split('.')[0]
    
    return img_name_wo_ext


# Функция, которая по картинке отдает метки классов распознанных обхектов, баунды и их скоры
def get_prediction(graph_data, predictor, im):

    outputs = predictor(im)
    
    v = Visualizer(im[:, :, ::-1], 
                   scale=0.5
    )
    out = v.draw_instance_predictions(outputs["instances"].to("cpu"))
    
    img_name_wo_ext = get_img_name(graph_data)
    
    if draw_images_mode:
        cv2.imwrite(os.path.join(dir_for_recognized_graphs, img_name_wo_ext + '_clear.png'), im)
        cv2.imwrite(os.path.join(dir_for_recognized_graphs, img_name_wo_ext + '_recognized.png'), out.get_image())
    
    classes = outputs["instances"].pred_classes
    bounds = outputs["instances"].pred_boxes
    scores = outputs["instances"].scores
    
    bounds_labeled = list(zip(classes, bounds, scores))
    
    return bounds_labeled


# По результатам экспериментов, 99%, что нераспознанная вторая буква это I
def fix_text_label(text):
    # убираем перевод строки
    text = text.split('\n')[0]
            
    # допустим он потерял букву I - все остальные распознаются ок (ну и плюс мы не потеряем ноду)
    if len(text) == 1:
        text = text + 'I'
        
    return text


# Функция, которая нахоидт все ноды на картинке, распознает их текстовые метки и отдает словарик с нужными данными
def get_nodes_dict(graph_data, bounds_labeled, im, cnt=0):
    
    nodes_dict = dict()
    
    if draw_images_mode: cnt = 0  # счетчик для названий, под окторыми будем вырисовывать ноды/ребра
        
    img_name_wo_ext = get_img_name(graph_data)
    
    if write_recognized_text:
        # Сюда прилетят все распознанные текстовые метки
        file = open(os.path.join(dir_for_recognized_graphs, img_name_wo_ext + '_recognized_text.txt'), "a")

    # идем по всем нодам и распознаем их метки
    for bound in bounds_labeled:
        
        # если не нода или если уверенность ниже 95%
        if bound[0] != 0 or bound[2] < 0.95:
            continue
            
        # Исходно все значения float, а мы работаем с пикселями
        bound_int = [int(val) for val in bound[1]]  # x1, y1, x2, y2
        
        im2 = im.copy()
        
        # Сужаем границы баундбокса тк иначе ничего не распознать
        shift_x = int(abs(bound_int[2] - bound_int[0])*0.25)
        shift_y = int(abs(bound_int[3] - bound_int[1])*0.25)
        
        cropped = im2[bound_int[1] + shift_y:bound_int[3] - shift_y, 
                      bound_int[0] + shift_x:bound_int[2] - shift_x]
        
        if draw_images_mode:
            # Рисуем кропленную ноду, над которой будем стараться
            cv2.imwrite(os.path.join(dir_for_recognized_graphs, 
                                     img_name_wo_ext + '_cropped_node' + str(cnt) + '.png'), 
                        cropped) 
            cnt += 1
        
        # Распознавание текста
        text = pytesseract.image_to_string(cropped, config=custom_config)
        
        # убираем перевод строки и всякое такое
        text = fix_text_label(text)
        
        # текст не распознался - а угадывать метку ноды наверное не прикол, но вообще сильновероятно, что нода там была...
        if text:
            
            if write_recognized_text:
                file.write(text + '\n')

            nodes_dict[text] = {'bbox': bound_int, 
                                'score': bound[2],
                                'center_y': int((bound_int[3] + bound_int[1])*0.5), 
                                'center_x': int((bound_int[2] + bound_int[0])*0.5)}
    
    if write_recognized_text:
        file.close()
    
    return nodes_dict


# Если нода нашлась, но она не попала в связный граф, то я все равно хочу видеть ее в итоговом списке смежности
def add_not_connected_nodes(nodes_dict, adj_list):
    for node in nodes_dict:
        if node not in adj_list:
            adj_list[node] = []
    
    return adj_list


# Функция клепает список смежности по распознанным данным
def create_adj_list(graph_data, bounds_labeled, im, nodes_dict, cnt=0):
    
    adj_list = defaultdict(dict)
    # score_dict = defaultdict(dict)
    img_name_wo_ext = get_img_name(graph_data)
    im2 = im.copy()
    
    for bound in bounds_labeled:
        
        if bound[0] == 0:
            continue
        
        bound_int = [int(val) for val in bound[1]]  # x1, y1, x2, y2
        
        bound_score = bound[2]
        bound_class = int(bound[0])
        bound_type = bound_class - 1  # Типы ребер 0 и 1 превратились в 1 и 2 классы обхектов для нейронки, 
                                      # теперь веду их обратно из класса в тип
        
        cropped = im2[bound_int[1]:bound_int[3], bound_int[0]:bound_int[2]]
        
        if draw_images_mode:
            # записываем картинку с кропленным ребром, над которым сейчас будем стараться
            cv2.imwrite(os.path.join(dir_for_recognized_graphs, 
                                     img_name_wo_ext + '_cropped_edge' + str(cnt) + '.png'), cropped) 
        
        # Ищем ноды, которые связывает наше ребро
        node_1, node_2 = get_edge_nodes(cropped, bound_int, cnt, nodes_dict, graph_data)
        
        # Ребро уже в базе
        if node_1 in adj_list and node_2 in adj_list[node_1]:
            # Хотела сначала чекать score ребра, но на практике выяснилась простая идея:
            # Если ребро распозналось более 1 раза и хотя бы одно из них распозналось как двойное, 
            # то это скорее всего двойное ребро, как ни крути
            if adj_list[node_1][node_2]['type'] == 0 or bound_type == 1:  # Если было двойное ребро или новое ребро одиночное
                continue
            
        # И записываем их в список смежности
        adj_list[node_1][node_2] = {'weigth': '1', 'type': bound_type}
        adj_list[node_2][node_1] = {'weigth': '1', 'type': bound_type}
        
        # Отдельно запоминаем score тк структура выходного словаря утверждена, а значение хранить хочется
        # score_dict[node_1][node_2] = {'score': bound_score}
        # score_dict[node_2][node_1] = {'score': bound_score}
        
        if draw_images_mode:
            cnt += 1
            
    return add_not_connected_nodes(nodes_dict, adj_list)


# Распознавалка main
def recognize_graph(graph_data, predictor):
    
    im = cv2.imread(graph_data["file_name"])
    
    bounds_labeled = get_prediction(graph_data, predictor, im)
    nodes_dict = get_nodes_dict(graph_data, bounds_labeled, im)
    adj_list = create_adj_list(graph_data, bounds_labeled, im, nodes_dict)
    
    return adj_list
    

## Running with random graph from val dataset

In [161]:
# Берем все данные из валидационного сета - вообще тестового, но они у себя зовут его валидационным так и оставим
dataset_dicts = get_dict_list(validation_data_dir)

for graph_data in random.sample(dataset_dicts, 1): 
    
    adj_list = recognize_graph(graph_data, predictor)
    img_name_wo_ext = get_img_name(graph_data)
    
    with open(os.path.join(dir_for_recognized_graphs, img_name_wo_ext + '_recognized_graph.pickle'), 'wb') as handle:
        pickle.dump(adj_list, handle, protocol=pickle.HIGHEST_PROTOCOL)

## Running with image name

In [164]:
path_to_image = os.path.join('D:\Projects\Anaconda\Signals_ml_22\graph-recognizer\graph_img_val', 'graph_89.png')

graph_data = {'file_name': path_to_image}
adj_list = recognize_graph(graph_data, predictor)
img_name_wo_ext = get_img_name(graph_data)
    
with open(os.path.join(dir_for_recognized_graphs, img_name_wo_ext + '_recognized_graph.pickle'), 'wb') as handle:
    pickle.dump(adj_list, handle, protocol=pickle.HIGHEST_PROTOCOL)

## Simple recognition cheker

In [125]:
def get_adj_list(dir_name: str, graph_data, recogn = False):
    img_name_wo_ext = get_img_name(graph_data)
    
    if recogn:
        img_name_wo_ext += '_recognized_graph'
    with open(os.path.join(dir_name, img_name_wo_ext + '.pickle'), 'rb') as handle:
        adj_list = pickle.load(handle)
        
    return adj_list


def check_diff(graph_data, pred_adj_list=None):
    src_adj_list = get_adj_list(validation_data_dir, graph_data)
    
    if not pred_adj_list:
        pred_adj_list = get_adj_list(dir_for_recognized_graphs, graph_data, recogn = True)
    
    matched_edges = 0
    matched_types = 0
    matched_nodes = 0
    
    wrong_type = 0
    lost_edge = 0
    lost_node = 0
    
    src_edges_num = 0
    src_nodes_num = 0
    
    for node in src_adj_list:
        src_nodes_num += 1
        if node in pred_adj_list: 
            matched_nodes += 1
            
            for connected_node in src_adj_list[node]:
                
                src_edges_num += 1

                src_type = src_adj_list[node][connected_node]['type']

                if connected_node in pred_adj_list[node]:
                    
                    pred_type = pred_adj_list[node][connected_node]['type']

                    matched_edges += 1
                    if pred_type == src_type:
                        matched_types += 1
                    else:
                        wrong_type += 1
                else:
                    lost_edge += 1
        else:
            lost_node += 1
    
    res = {
        # количество нод, которые были распознаны среди исходных
        'matched_nodes': matched_nodes,
        # всего нод было в графе
        'all_nodes_num': src_nodes_num,
        # частота верно распознанных нод
        'recognized_nodes_freq': matched_nodes / src_nodes_num,
        # кол-во ненайденных нод среди исходных
        'lost_nodes': lost_node // 2,
        # частота ненахождения ноды
        'lost_nodes_freq': lost_node / src_nodes_num,
        
        # кол-во найденныз ребер среди исходных
        'matched_edges': matched_edges // 2,
        # всего было ребер исходно
        'all_edges_num': src_edges_num // 2,
        # частота верного распознавания ребер
        'recognized_edges_freq': matched_edges / src_edges_num,
        
        # кол-во потерянных ребер
        'lost_edges': lost_edge // 2,
        # частота потери ребер
        'lost_edges_freq': lost_edge / src_edges_num,
        
        # кол-во верно распознанных типов ребер среди верно распознанных ребер
        'matched_edge_types': matched_types // 2,
        # частота верно распознанных типов ребер среди верно расп. ребер
        'matched_edge_types_freq': matched_types / matched_edges,
        
        # кол-во неверно распознанных типов ребер среди верно распознанных ребер
        'wrong_edge_types': wrong_type // 2,
        # частота неверно распознанных типов ребер среди верно расп. ребер
        'wrong_edge_types_freq': wrong_type / matched_edges,
    }
    
    return res

## Checker res example for graph_105 

Результаты распознавания находятся в директории репо graph_recognition_res/graph_105/

In [167]:
path_to_image = os.path.join('D:\Projects\Anaconda\Signals_ml_22\graph-recognizer\graph_img_val', 'graph_105.png')

graph_data = {'file_name': path_to_image}
adj_list = recognize_graph(graph_data, predictor)
img_name_wo_ext = get_img_name(graph_data)
    
with open(os.path.join(dir_for_recognized_graphs, img_name_wo_ext + '_recognized_graph.pickle'), 'wb') as handle:
    pickle.dump(adj_list, handle, protocol=pickle.HIGHEST_PROTOCOL)
    
# Первый аргумент - словарик с названием картинки, он будет искать в той же директории .pickle со списком смежности исходного графа
# Второй аргумент - либо пустой (тогда он ищет в своей директории с результатами записанный .pickle), либо расп. список смежности
check_diff(graph_data, adj_list)

{'matched_nodes': 9,
 'all_nodes_num': 9,
 'recognized_nodes_freq': 1.0,
 'lost_nodes': 0,
 'lost_nodes_freq': 0.0,
 'matched_edges': 8,
 'all_edges_num': 8,
 'recognized_edges_freq': 1.0,
 'lost_edges': 0,
 'lost_edges_freq': 0.0,
 'matched_edge_types': 8,
 'matched_edge_types_freq': 1.0,
 'wrong_edge_types': 0,
 'wrong_edge_types_freq': 0.0}

## Get statistics

In [126]:
# Берем все данные из валидационного сета - вообще тестового, но они у себя зовут его валидационным так и оставим
dataset_dicts = get_dict_list(validation_data_dir)

df = pd.DataFrame()

for graph_data in tqdm(dataset_dicts): 
    
    adj_list = recognize_graph(graph_data, predictor)
    img_name_wo_ext = get_img_name(graph_data)
    
    metrics = check_diff(graph_data, adj_list)
    df = df.append(metrics, ignore_index=True)
    
df.to_csv('metrics.csv', sep=';')

HBox(children=(FloatProgress(value=0.0, max=108.0), HTML(value='')))




In [131]:
df.head()

Unnamed: 0,all_edges_num,all_nodes_num,lost_edges,lost_edges_freq,lost_nodes,lost_nodes_freq,matched_edge_types,matched_edge_types_freq,matched_edges,matched_nodes,recognized_edges_freq,recognized_nodes_freq,wrong_edge_types,wrong_edge_types_freq
0,1.0,2.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0,2.0,1.0,1.0,0.0,0.0
1,10.0,11.0,2.0,0.2,0.0,0.0,8.0,1.0,8.0,11.0,0.8,1.0,0.0,0.0
2,7.0,8.0,1.0,0.142857,0.0,0.0,6.0,1.0,6.0,8.0,0.857143,1.0,0.0,0.0
3,7.0,8.0,0.0,0.0,0.0,0.0,7.0,1.0,7.0,8.0,1.0,1.0,0.0,0.0
4,6.0,7.0,1.0,0.166667,0.0,0.0,5.0,1.0,5.0,7.0,0.833333,1.0,0.0,0.0


In [140]:
stat = df[['recognized_nodes_freq', 'recognized_edges_freq', 
    'lost_nodes_freq', 'lost_edges_freq', 
    'matched_edge_types_freq', 'wrong_edge_types_freq'
   ]].mean()

In [143]:
print('Средняя частота верно распознанных нод: ', stat['recognized_nodes_freq'])

print('Средняя частота потери нод: ', stat['lost_nodes_freq'])

print('Средняя частота верного распознавания ребер: ', stat['recognized_edges_freq'])

print('Средняя частота потери ребер: ', stat['lost_edges_freq'])

print('Средняя частота верно распознанных типов ребер среди верно расп. ребер: ', stat['matched_edge_types_freq'])
        
print('Средняя частота неверно распознанных типов ребер среди верно расп. ребер: ', stat['wrong_edge_types_freq'])

Средняя частота верно распознанных нод:  0.9956349206349208
Средняя частота потери нод:  0.004365079365079365
Средняя частота верного распознавания ребер:  0.9435060567005008
Средняя частота потери ребер:  0.05649394329949885
Средняя частота верно распознанных типов ребер среди верно расп. ребер:  0.9472001763668426
Средняя частота неверно распознанных типов ребер среди верно расп. ребер:  0.05279982363315696
