# 燃气表|电能表表底计数识别
主要分为2个任务：
1.图片中数字区域检测
2.数字区域识别

In [1]:
import torch
import numpy as np
import pandas as pd
import os
import sys
import cv2
import re
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.mixture import GaussianMixture
from sklearn.cluster import AgglomerativeClustering
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import asyncio
import glob
import shutil
from collections import Counter,defaultdict
sns.set_style("darkgrid")

## 数字区域检测
数据集准备与分析

In [None]:
# 数据路径
data_path = 'D:\数据集整理\Video'

In [None]:
image_list = os.listdir(data_path)

### 存在需要剔除的反光图片，考虑使用高斯混合模型将正常图和反光图聚成2类

In [None]:
abnormal_image_path = f'{data_path}\\474902489004576_20220214070001.png'
normal_image_path = f'{data_path}\\474902489004576_20220214080003.png'

In [None]:
image_normal = cv2.imdecode(np.fromfile(normal_image_path, dtype=np.uint8), -1)
# temp = cv2.imread(image_path)
cv2.imshow('main', image_normal)
cv2.waitKey(0)
cv2.destroyWindow('main')

In [None]:
# 观察数据分布
plt.figure(figsize=(15, 10))
sns.histplot(image_normal.flatten(), bins=50)

In [None]:
image_abnormal = cv2.imdecode(np.fromfile(abnormal_image_path, dtype=np.uint8), -1)
# temp = cv2.imread(image_path)
cv2.imshow('main', image_abnormal)
cv2.waitKey(0)
cv2.destroyWindow('main')

In [None]:
# 观察数据分布
plt.figure(figsize=(15, 10))
sns.histplot(image_abnormal.flatten(), bins=50)

In [None]:
image_gen = ((i, cv2.imdecode(np.fromfile(f'{data_path}\\{i}', dtype=np.uint8), -1)) for i in image_list)

In [None]:
feature = {}
split = 50
for task in image_gen:
    feature[task[0]] = np.histogram(task[1], split)[0]

In [None]:
label_arr = np.array(list(feature.keys()))
label_arr

In [None]:
feature_arr = np.array(list(feature.values()))
feature_arr

In [None]:
feature_arr.shape

In [None]:
# 层次聚类
agg = AgglomerativeClustering(n_clusters=2)
predict_res = agg.fit_predict(feature_arr)

In [None]:
# k-means
km = KMeans(n_clusters=2)
predict_res = km.fit_predict(feature_arr)

In [None]:
# 高斯混合模型
gm = GaussianMixture(n_components=2)
predict_res = gm.fit_predict(feature_arr)

In [None]:
# 取右均值大于左均值的那一类为正常图片
normal_number = 0
length = feature_arr.shape[1]
feature_0_left_mean = feature_arr[np.argwhere(predict_res == 0).flatten()][:, :length//2].mean()
feature_0_right_mean = feature_arr[np.argwhere(predict_res == 0).flatten()][:, length//2:].mean()
if feature_0_right_mean < feature_0_left_mean:
    normal_number = 1

In [None]:
normal_number

In [None]:
label_abnormal = label_arr[np.argwhere(predict_res == 1 - normal_number).flatten()]
label_normal = label_arr[np.argwhere(predict_res == normal_number).flatten()]

In [None]:
abnormal_path = 'D:\\数据集整理\\abnormal_data_50_agg2'
normal_path = 'D:\\数据集整理\\normal_data_50_agg2'
if not os.path.exists(abnormal_path):
    os.makedirs(abnormal_path)
    
if not os.path.exists(normal_path):
    os.makedirs(normal_path)

In [None]:
for i in label_normal:
    temp = cv2.imdecode(np.fromfile(f'{data_path}\\{i}', dtype=np.uint8), -1)
    cv2.imencode('.png', temp)[1].tofile(f'{normal_path}\\{i}')

In [None]:
for i in label_abnormal:
    temp = cv2.imdecode(np.fromfile(f'{data_path}\\{i}', dtype=np.uint8), -1)
    cv2.imencode('.png', temp)[1].tofile(f'{abnormal_path}\\{i}')

In [None]:
source = 'D:\\demo\\yolov5\\requirements2.txt'
destination = 'D:\\demo\\yolov5\\requirements_rgb.txt'

## 生成目标检测数据集
共3927张图片，随机取其中的n张，并划分为训练集和验证集

In [None]:
import cv2

In [None]:
use_data_path = 'D:\\数据集整理\\normal_data_50_agg2'
train_sets = 2000
val_sets = 200

In [None]:
use_image_list = os.listdir(use_data_path)
use_image_list[:10]

In [None]:
len(use_image_list)

In [None]:
# 每块表取train_set // 2张图片
image_splits = defaultdict(list)
for ui in use_image_list:
    temp_l = ui.split('_')[0]
    image_splits[temp_l].append(ui)

In [None]:
# 每块表的图片按照日期顺序排列
for k in image_splits:
    v = np.array(image_splits[k])
    temp = [int(x.split('_')[1].split('.')[0]) for x in v]
    sort_idx = np.argsort(temp).flatten()
    image_splits[k] = v[sort_idx][-(train_sets + val_sets)//2:]

In [None]:
all_images = np.array([x for x in image_splits.values()]).flatten()

In [None]:
all_images.shape

In [None]:
# np.random.seed(666)
# shuffle_idx = np.random.permutation(len(use_image_list))
# shuffle_idx

In [None]:
val_idx = np.arange(0, train_sets + val_sets, train_sets/val_sets + 1, dtype=int)
train_idx = np.array([i for i in range(train_sets + val_sets) if i not in val_idx])
val_images = all_images[val_idx]
train_images = all_images[train_idx]

In [None]:
print(train_images.__len__(), train_images[:5])
print(val_images.__len__(), train_images[:5])

In [None]:
# 将图片转换为jpg格式，并存储到指定位置
output_path = 'D:\\demo\\GasMeterData\\images\\train'
if not os.path.exists(output_path):
    os.makedirs(output_path)

number = 0
for il in train_images:
    temp_path = f'{use_data_path}\\{il}'
    temp = cv2.imdecode(np.fromfile(temp_path, dtype=np.uint8), -1)
    cv2.imwrite(f'{output_path}\\img{number}.jpg', temp)
    number += 1

In [None]:
output_path = 'D:\\demo\\GasMeterData\\images\\val'
if not os.path.exists(output_path):
    os.makedirs(output_path)

for il in val_images:
    temp_path = f'{use_data_path}\\{il}'
    temp = cv2.imdecode(np.fromfile(temp_path, dtype=np.uint8), -1)
    cv2.imwrite(f'{output_path}\\img{number}.jpg', temp)
    number += 1

In [None]:
# train is divided into 5 parts
current_path = 'D:\\demo\\GasMeterData\\images\\train'
current_images = os.listdir(current_path)
number = 1
for num, il in enumerate(current_images):
    temp_path = f'{current_path}\\{il}'
    temp = cv2.imread(temp_path)
    temp_folder_path = f'{current_path}{number}'
    if not os.path.exists(temp_folder_path):
        os.makedirs(temp_folder_path)
    cv2.imwrite(f'{temp_folder_path}\\{il}', temp)
    if (num + 1) % 200 == 0:
        number += 1

In [None]:
# 剔除未标注的数据
read_path = 'D:\\demo\\GasMeterData\\labels\\train'
txt_list = glob.glob(read_path + '\\*')
use_list = []
for path in txt_list:
    with open(path, 'r') as f:
        res = f.read()
        if res.strip('\n'):
            use_list.append(path)

In [None]:
# 剔除未标注的图片
image_list = [x.split('\\')[-1].split('.')[0] for x in use_list]
image_path = 'D:\\demo\\GasMeterData\\images\\train'
all_image_list = os.listdir(image_path)
for i in all_image_list:
    if i.split('.')[0] not in image_list:
        os.remove(f'{image_path}\\{i}')
# 剔除标签(前面忘了...)
for i in txt_list:
    if i not in use_list:
        os.remove(i)

In [None]:
# image size: 1080 * 1920 * 3
# 转换标签相对大小

In [None]:
crop_data_path = 'D:\\demo\\GasMeterData_pre\\crop_data'

In [None]:
txt_path = 'D:\\demo\\GasMeterData\\labels\\train'
out_path = 'D:\\demo\\GasMeterData\\labels\\train2'
plate_label_path = 'D:\\demo\\GasMeterData_pre\\crop_label'
raw_h, raw_w = 1080, 1920
for i in use_list:
    res = i.split('\\')[-1].split('.')[0]
    # 读取当前表底框坐标位置
    fr = open(f'{out_path}\\{res}.txt', 'w+')
    with open(f'{plate_label_path}\\{res}.txt', 'r') as f:
        plate_xywh = list(map(float, f.readline().split(' ')[1:]))
        plate_x, plate_w = plate_xywh[0] * raw_w, plate_xywh[2] * raw_w
        plate_y, plate_h = plate_xywh[1] * raw_h, plate_xywh[3] * raw_h
        # 把坐标转换到左上角
        plate_zx = plate_x - plate_w / 2
        plate_zy = plate_y - plate_h / 2
    with open(f'{txt_path}\\{res}.txt', 'r') as f:
        for line in f.readlines():
            lxywh = line.strip('\n').split(' ')
            x,y,w,h = list(map(float, lxywh[1:]))
            crop_x = (x * raw_w - plate_zx) / plate_w
            crop_y = (y * raw_h - plate_zy) / plate_h
            crop_w = w * raw_w / plate_w
            crop_h = h * raw_h / plate_h
            # 转换后的坐标写入新的文件
            fr.write(f'{lxywh[0]} {crop_x:.6f} {crop_y:.6f} {crop_w:.6f} {crop_h:.6f}\n')
    fr.close()

In [None]:
# 测试标签转换效果
test_image = f'{crop_data_path}\\img0.jpg'
test_txt = f'{out_path}\\img0.txt'
image_arr = cv2.imread(test_image)
print(image_arr.shape)
f = open(f'{test_txt}', 'r')
image_txts = f.readlines()
f.close()
s = ''
raw_h, raw_w = image_arr.shape[0], image_arr.shape[1]
for l in image_txts:
    lxywh = l.strip('\n').split(' ')
    s += f'{lxywh[0]} '
    x, y, w, h = list(map(float, lxywh[1:]))
    x1, y1 = round((x - w / 2) * raw_w), round((y - h / 2) * raw_h)
    x2, y2 = round((x + w / 2) * raw_w), round((y + h / 2) * raw_h)
    x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
    cv2.rectangle(image_arr, (x1, y1), (x2, y2), (255, 0, 0), 2)
print(s)
cv2.imshow('main', image_arr)
cv2.waitKey(0)
cv2.destroyWindow('main')

In [None]:
# 取出已标记的裁剪后的图片
oo = 'D:\\demo\\GasMeterData_pre\\train_from_cvat_crop_image\\image'
use_i = []
for i in use_list:
    res = i.split('\\')[-1].split('.')[0]
    res_p = f'{crop_data_path}\\{res}.jpg'
    shutil.copyfile(res_p, f'{oo}\\{res}.jpg')

In [None]:
# 对预测结果使用聚类算法，将同一行的数据聚集在一起
data_path_base = 'D:\\demo\\GasMeterAp'
pre_txt_path = f'{data_path_base}\\labels\\val'

In [None]:
# 读取标签
pre_txt_list = glob.glob(pre_txt_path + '/*')
pre_txt_list

In [None]:
out_res = {}
for l in pre_txt_list:
    res, features, labels = [], [], []
    with open(l, 'r') as f:
        lines = f.readlines()
        for li in lines:
            lxywh = list(map(float, li.strip('\n').split(' ')))
            features.append([lxywh[1], (lxywh[2] + lxywh[4] / 2) * 5])
            labels.append(int(lxywh[0]))
    features, labels = np.array(features), np.array(labels)
    # 通过轮廓系数选定最优的聚类数
    choose_clusters, choose_ss = 1, -1
    for c in range(2, 11):
        
        agg = KMeans(n_clusters=c)
        pre = agg.fit_predict(features)
        ss = silhouette_score(features, pre)
        if ss > choose_ss:
            choose_ss = ss
            choose_clusters = c
    print(f'{l} choose clusters: {choose_clusters}')
    agg = KMeans(n_clusters=choose_clusters)
    pre = agg.fit_predict(features)
    # 将同类的放在一起
    for i in range(choose_clusters):
        ln_idx = np.argwhere(pre == i).flatten()
        ln_sort_idx = np.argsort(features[ln_idx, 0].flatten())
        res.append(labels[ln_idx][ln_sort_idx])
    out_res[l] = res

In [None]:
out_res

# 标注数据格式转换

In [19]:
data_path = 'D:\\demo\\GasMeterData_pre\\at_v1\\obj_train_data'

In [20]:
out_path = 'D:\\demo\\GasMeterNumber'

In [21]:
def cvat_to_yolo(data_path, out_path, train_split=0.8):
    doc_list = os.listdir(data_path)
    images = np.array([x for x in doc_list if '.txt' not in x])
    # 数据分为训练集和验证集（用最新的数据做验证集）
    keys = [x.split('.')[0].lstrip('img') for x in images]
    sort_key_idx = np.argsort(keys)
    t_split = int(len(images) * train_split)
    train_idx, val_idx = sort_key_idx[:t_split], sort_key_idx[t_split+1:]
    train_images, val_images = images[train_idx], images[val_idx]
    train_labels = [x.replace('jpg', 'txt') for x in train_images]
    val_labels = [x.replace('jpg', 'txt') for x in val_images]
    train_image_path = os.path.join(out_path, 'images/train')
    val_image_path = os.path.join(out_path, 'images/val')
    train_label_path = os.path.join(out_path, 'labels/train')
    val_label_path = os.path.join(out_path, 'labels/val')
    for obj, obj_path in zip([train_images, val_images, train_labels, val_labels], 
                             [train_image_path, val_image_path, train_label_path, val_label_path]):
        if not os.path.exists(obj_path):
            os.makedirs(obj_path)
        for ti in obj:
            shutil.copyfile(os.path.join(data_path, ti), os.path.join(obj_path, ti))

In [22]:
cvat_to_yolo(data_path, out_path)