<a href="https://colab.research.google.com/github/namuunbayar/MachineLearning/blob/main/ocr_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import os
import sys
import pandas as pd
import numpy as np
import cv2
import io
import re
import base64
from google.cloud import vision
import json
from django.utils import timezone
import pathlib
from google.cloud.vision import AnnotateImageResponse
import argparse
from enum import Enum
from PIL import Image, ImageDraw
from operator import itemgetter

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] ="./gas-meter-ocr-356007-981ec8805f98.json"

In [4]:
!pip install google-cloud-vision
!pip install django

Collecting django
  Downloading Django-4.2.7-py3-none-any.whl (8.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.0/8.0 MB[0m [31m41.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asgiref<4,>=3.6.0 (from django)
  Downloading asgiref-3.7.2-py3-none-any.whl (24 kB)
Installing collected packages: asgiref, django
Successfully installed asgiref-3.7.2 django-4.2.7


In [6]:
# responseをjsonで保存
def save_as_json(response, filename):
    data = AnnotateImageResponse.to_json(response)
    with open(filename, mode='wt', encoding='utf-8') as file:
        json.dump(data, file, ensure_ascii=False, indent=2)

# jsonをAnnotateImageResponse形式で読み出し
def load_from_json(filename):
    with open(filename, mode='r', encoding='utf-8') as file:
        temp = json.load(file)
    response = AnnotateImageResponse.from_json(temp)
    return response

In [7]:
#Vision APIを呼び出しjsonで保存
def get_resp(file_path, output_dir = 'output/', hint='ja'):
    # Instantiates a client
    client = vision.ImageAnnotatorClient()

    # The name of the image file to annotate
    file_name = os.path.abspath(file_path)

    # Loads the image into memory
    with io.open(file_name, 'rb') as image_file:
        content = image_file.read()

    image = vision.Image(content=content)

    # Performs label detection on the image file
    response =  client.document_text_detection(
            image=image,
            image_context={'language_hints': [hint]}
        )

    jst_now = timezone.datetime.now()
    resp_file = output_dir
    basename = os.path.splitext(os.path.basename(file_path))[0]
    #now = jst_now.strftime('%Y_%m%d_%H%M%S')
    now = jst_now.strftime('_%Y%m%d')
    resp_file = resp_file + basename + now + '.json'

    save_as_json(response, resp_file)

    return response

In [8]:
#Vision APIを呼び出しjsonで保存
def get_resp_text_detection(file_path, output_dir = 'output/'):
    # Instantiates a client
    client = vision.ImageAnnotatorClient()

    # The name of the image file to annotate
    file_name = os.path.abspath(file_path)

    # Loads the image into memory
    with io.open(file_name, 'rb') as image_file:
        content = image_file.read()

    image = vision.Image(content=content)

    # Performs label detection on the image file
    response =  client.text_detection(
            image=image,
            image_context={'language_hints': ['ja']}
        )

    jst_now = timezone.datetime.now()
    resp_file = output_dir
    basename = os.path.splitext(os.path.basename(file_path))[0]
    #now = jst_now.strftime('%Y_%m%d_%H%M%S')
    now = jst_now.strftime('_%Y%m%d')
    resp_file = resp_file + basename + now + '.json'

    save_as_json(response, resp_file)

    return response

In [9]:
def draw_boxes(image, texts, color):
    draw = ImageDraw.Draw(image)

    for text in texts:
        draw.polygon(
            [
                text["bounding_box"].vertices[0].x,
                text["bounding_box"].vertices[0].y,
                text["bounding_box"].vertices[1].x,
                text["bounding_box"].vertices[1].y,
                text["bounding_box"].vertices[2].x,
                text["bounding_box"].vertices[2].y,
                text["bounding_box"].vertices[3].x,
                text["bounding_box"].vertices[3].y,
            ],
            None,
            color,
        )
    return image

In [10]:
# 文章+座標インスタンスのリストから「pattern」で文章検索
def find_text(texts, pattern):
    results = []

    for text in texts:
        res = re.findall(pattern, text["text"],flags=re.MULTILINE)
        if(res):
            # text["text"]=res[0]
            # results.append(text)
            results.append({'text':res[0],'bounding_box':text["bounding_box"]})

    return results

In [11]:
# 2つの矩形Boxインスタンス間の距離を計算
def calculate_distance(bound_1, bound_2):
    return (bound_1.vertices[0].x-bound_2.vertices[0].x)**2 + (bound_1.vertices[0].y-bound_2.vertices[0].y)**2

In [12]:
# search
def search_with_key(texts, key, pattern):
    results = []
    keys = find_text(texts, key)
    candidates = find_text(texts, pattern)
    if(keys):
        for cand in candidates:
            cand["distance"] = calculate_distance(cand["bounding_box"],keys[0]["bounding_box"])

        # candidates = sorted(candidates, key=itemgetter("distance"))

    else:
        for cand in candidates:
            if cand["text"].isalpha() or cand["text"].isdecimal():
                cand["distance"] = 99999999
            else:
                cand["distance"] = -1

    candidates = sorted(candidates, key=itemgetter("distance"))

    return candidates

In [13]:
# APIレスポンスから文章単位のテキストと座標のリストを取得
# {'text':'文章','bounding_box':矩形の頂点座標}
def get_paragraph_texts(response):
    """Returns document bounds given an response."""
    paragraph_texts = []
    document = response.full_text_annotation

    # paragraph(文章)単位まで分割
    for page in document.pages:
        for block in page.blocks:
            for paragraph in block.paragraphs:
                paragraph_text = ''
                for word in paragraph.words:
                    word_text = ''.join([symbol.text for symbol in word.symbols])
                    paragraph_text += word_text
                paragraph_texts.append({'text':paragraph_text,'bounding_box':paragraph.bounding_box})

    return paragraph_texts

In [14]:
def read_value(response):
    value = '0000'
    text = response.text_annotations[0].description
    text = text.replace('Ο','0').replace('O','0').replace('E0','').replace('I','1')

    new_text = re.sub(r"[^0-9]","", text)

    if(len(new_text) > 0):
        value = new_text[-4:]
    return value

### 実験

In [15]:
data_dir = '0222_bushu_train_responses/'
json_list = list(pathlib.Path(data_dir).glob('**/*.json'))

In [16]:
crop_dir = 'crop_out/'
cropped_list = list(pathlib.Path(crop_dir).glob('**/*.json'))

In [17]:
# paragraphごとに分割して出力
print("===============================")
for j in json_list:
    resp = load_from_json(j)
    para_texts = get_paragraph_texts(resp)

    name = str(j).split('\\')

    print("====="+ name[1] +"=====")
    print("===============================")

    for paragraph in para_texts:
        print(paragraph["text"])

    print("===============================")



In [18]:
tex = 'Ο Ο ΟO Ο'
tmp = tex.replace('Ο','0')
print(tmp)

0 0 0O 0


In [19]:
# text全体を出力
print("===============================")
for j in cropped_list:
    resp = load_from_json(j)
    # para_texts = get_paragraph_texts(resp)

    name = str(j).split('\\')
    nm = name[1].replace('.json','').replace('2023','').replace('crop_out_','')

    print("====="+ nm +"=====")
    print("===============================")

    print(resp.text_annotations[0].description)
    print('↓↓↓↓')
    read_value(resp)

    print("===============================")



In [20]:
input_dir = '0222_bushu_train_images'
image_list = list(pathlib.Path(input_dir).glob('**/*.jpg'))

In [21]:
crop_in = 'crop_in'
crop_list = list(pathlib.Path(crop_in).glob('**/*.jpg'))

In [22]:
for i,c in enumerate(crop_list):
    img = cv2.imread(str(c))

    # BGRからYCrCb色空間に変換
    img_ycrcb = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb)

    # Yチャンネルの範囲を0~255に正規化
    y, cr, cb = cv2.split(img_ycrcb)
    y_norm = cv2.normalize(y, None, 0, 255, cv2.NORM_MINMAX)

    # 正規化されたYチャンネルを元の画像に戻す
    img_norm = cv2.merge((y_norm, cr, cb))
    img_norm_bgr = cv2.cvtColor(img_norm, cv2.COLOR_YCrCb2BGR)

    # 変換後の画像を保存
    cv2.imwrite('output_image.jpg', img_norm_bgr)

In [23]:
# APIに通し結果を格納
# for image in image_list:
#     get_resp(image)

In [24]:
# # APIに通し結果を格納
for image in crop_list:
    get_resp(image,'crop_out','en-digits')

In [None]:
output_dir = '0222_bushu_train_responses/'
json_list = list(pathlib.Path(output_dir).glob('**/*.json'))

In [None]:
sample_resp = load_from_json(json_list[0])

In [None]:
# sample_resp

In [None]:
texts = sample_resp.text_annotations
print('Texts:')

for text in texts:
    print('\n"{}"'.format(text.description))

    vertices = (['({},{})'.format(vertex.x, vertex.y)
                for vertex in text.bounding_poly.vertices])

    print('bounds: {}'.format(','.join(vertices)))

In [None]:
from ultralytics import YOLO

model = YOLO("best.pt")

In [None]:
results = model('bushu_train')

In [None]:
for result in results:
    # print(result.path)
    base_name = str(result.path).split('\\')[-1].split('.')[0]
    img = cv2.imread(result.path)
    boxes = result.boxes.xyxy.to('cpu').numpy().astype(int)
    confidences = result.boxes.conf.to('cpu').numpy().astype(float)
    labels = result.boxes.cls.to('cpu').numpy().astype(int)

    for box, conf, label in zip(boxes, confidences, labels):
        if label == 2:
            x_min, y_min, x_max, y_max = box
            img_crop = img[y_min:y_max, x_min:x_max]
            cv2.imwrite('crop_in/' + base_name +str(round(conf*100,2))+ '.jpg', img_crop)


In [None]:
class Meter:
    bushu_num = ['[0-9]{2}-[0-9]{3}-[0-9]{3}', '^[0-9]{2}[.・:-]*[0-9]{3}[.・:-]*[0-9]{3}$','^[0-9]{1,2}[.・:-]*[0-9]{2,3}[.・:-]*[0-9]{2,3}$']
    bushu_model = ['[NJ][ ]*[B8][ ]*[0-9][.-。・:][0-9]','[NJ][ ]*[B8][ ]*[0-9]{1,2}','[NJ][ ]*D[ ]*[SB8][ ]*[0-9]{1,2}']
    rgx_exp_date = ['(20[1-3][0-9][ ]{0,2}[.・ー年 -][ ]{0,2}((10)|(11)|(12)|[1-9]|0[1-9]))','([1-3][0-9][.年]((10)|(11)|(12)|[1-9]|0[1-9]))', '([1-3][0-9][・ー-]((10)|(11)|(12)|[1-9]|0[1-9]))']
    rgx_pointer_val = '[0-9 ]{4,}'

    def __init__(self, text):
        self.text = text
        self.pointer_val = []
        self.management_num = []
        self.model_num = []
        self.exp_date = []
        self.pointer_val = []

    def remove_space(self):
        self.text = self.text.replace(' ','')

    def get_management_num(self):
        for num in self.bushu_num:
            result = re.findall(num, self.text, flags=re.MULTILINE)
            [self.management_num.append(res) for res in result if res not in self.management_num]

    def get_model_num(self):
        for num in self.bushu_model:
            result = re.findall(num, self.text, flags=re.MULTILINE)
            # [self.model_num.append(res.replace(' ', '')) for res in result if res not in self.model_num]
            for res in result:
                res_pref = res.replace(' ','')
                if res_pref not in self.model_num:
                    self.model_num.append(res_pref)

    def get_exp_date(self):
        for num in self.rgx_exp_date:
            result = re.findall(num, self.text, flags=re.MULTILINE)
            [self.exp_date.append(res) for res in result if res not in self.exp_date]

    def get_value(self):
        meter_cands = self.text.split()
        meter_cands = re.findall(self.rgx_pointer_val, self.text, flags=re.MULTILINE)
        meter_cands = [cand.split() for cand in meter_cands]
        [[self.pointer_val.append(c) for c in cand if len(c)<=5 and len(c)>=4 and c not in self.pointer_val] for cand in meter_cands]

In [None]:
def read_meter(response):
    meter_info = {'number':'', 'model':'', 'year':'', 'month':'', 'value':''}
    # meter_info = {'number':'test', 'model':'NB6', 'year':'30', 'month':'12', 'value':'1024'}

    text = response.text_annotations[0].description
    meter_obj = Meter(text)
    meter_obj.remove_space()
    meter_obj.get_management_num()
    if meter_obj.management_num != []:
        meter_info["number"] = meter_obj.management_num[0]

    meter_obj.get_model_num()
    if meter_obj.model_num != []:
        meter_info["model"] = meter_obj.model_num[0]

    meter_obj.get_exp_date()
    if meter_obj.exp_date != []:
        expire = re.split('[.・ー年-]',meter_obj.exp_date[0][0])
        meter_info["year"] = expire[0]
        if(len(expire) > 1):
            meter_info["month"] = expire[1]

    return meter_info

In [None]:
for j in json_list:
    response = load_from_json(j)
    meter_info = read_meter(response)

    print(meter_info["month"])

In [None]:
get_resp('crop_in/2023_0222_15292388.1.jpg','crop_out')

In [None]:
image_path = 'test.png'

model = YOLO("best.pt")
results = model(image_path)

Ultralytics YOLOv8.0.47  Python-3.10.5 torch-1.13.1+cpu CPU
Model summary (fused): 168 layers, 3006233 parameters, 0 gradients, 8.1 GFLOPs

image 1/1 C:\Users\aorgil\Desktop\meter_exchange\test.png: 512x640 2 Meters, 2 Plates, 2 Values, 417.8ms
Speed: 5.0ms preprocess, 417.8ms inference, 9.6ms postprocess per image at shape (1, 3, 640, 640)


In [None]:
result = results[0]

boxes = result.boxes.xyxy.to('cpu').numpy().astype(int)
confidences = result.boxes.conf.to('cpu').numpy().astype(float)
labels = result.boxes.cls.to('cpu').numpy().astype(int)

# confidencesの降順でインデックスをソート
sorted_indices = np.argsort(confidences)[::-1]

# boxesとlabelsを並び替え
boxes_sorted = boxes[sorted_indices]
labels_sorted = labels[sorted_indices]
confidences_sorted = confidences[sorted_indices]

In [None]:
confidences_sorted

array([    0.96159,     0.96019,     0.94777,     0.94315,     0.92057,     0.84678])

In [None]:
# labels_sortedから指定したlabelに対応する要素のインデックスを取得
target_indices = np.where(labels_sorted == 2)[0]

# boxes_sortedから対応する要素を取得
value_boxes = boxes_sorted[target_indices]

target_indices = np.where(labels_sorted == 1)[0]
plate_boxes = boxes_sorted[target_indices]

In [None]:
print(plate_boxes)
print(value_boxes)

[[123 405 334 508]
 [552 430 822 560]]
[[575 182 698 236]
 [142 167 234 210]]


In [None]:
image = cv2.imread(image_path)
for i, box in enumerate(value_boxes):
    x_min, y_min, x_max, y_max = box
    image_crop = image[y_min:y_max, x_min:x_max]
    cv2.imwrite("value_"+str(i)+".jpg",image_crop)

In [None]:
# まとめてrename
import os

folder_path = 'crop_in'  # フォルダのパス

# フォルダ内の全ファイルに対してループを実行し、リネーム処理を行う
for filename in os.listdir(folder_path):
    # ファイル名に新しいサフィックスを追加
    base = os.path.splitext(filename)[0].replace('2023_','')
    base = base[:11]
    new_filename = base + os.path.splitext(filename)[1]
    print(new_filename)
    # リネーム処理を実行
    #os.rename(os.path.join(folder_path, filename), os.path.join(folder_path, new_filename))


0220_160224.jpg
0220_160308.jpg
0220_160409.jpg
0220_160452.jpg
0220_160532.jpg
0220_161624.jpg
0220_161911.jpg
0220_164625.jpg
0222_101709.jpg
0222_101723.jpg
0222_102009.jpg
0222_140611.jpg
0222_150525.jpg
0222_150707.jpg
0222_150824.jpg
0222_150847.jpg
0222_150926.jpg
0222_151031.jpg
0222_151120.jpg
0222_151219.jpg
0222_151243.jpg
0222_151342.jpg
0222_151414.jpg
0222_151446.jpg
0222_151514.jpg
0222_151551.jpg
0222_151619.jpg
0222_151656.jpg
0222_151810.jpg
0222_151906.jpg
0222_152004.jpg
0222_152025.jpg
0222_152108.jpg
0222_152505.jpg
0222_152522.jpg
0222_152532.jpg
0222_152611.jpg
0222_152624.jpg
0222_152647.jpg
0222_152707.jpg
0222_152724.jpg
0222_152830.jpg
0222_152923.jpg
0222_153356.jpg
0224_100033.jpg
0226_114328.jpg
0226_114458.jpg
0227_094833.jpg
0227_094857.jpg
0227_095052.jpg
