<a href="https://colab.research.google.com/github/tsoonjin/lazybone/blob/master/path_system_forge_ocr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Global var
root_path = '/content/drive/MyDrive/projects/pathdao_experience/'
use_gpu = False

# Set current workspace
!cd $root_path

# Determine if GPU is present

!pip install GPUtil
import GPUtil
use_gpu = len(GPUtil.getAvailable()) > 0

# Install libssl for paddle
!wget http://nz2.archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2.19_amd64.deb
!sudo dpkg -i libssl1.1_1.1.1f-1ubuntu2.19_amd64.deb
%cd drive/MyDrive/projects/pathdao_experience/

In [None]:
# Package installation

!pip install opencv-python
!pip install easyocr
!pip install paddleocr
!pip install paddlepaddle
if use_gpu:
  !nvidia-smi
  !pip install paddlepaddle-gpu==2.5.1.post120  -f https://www.paddlepaddle.org.cn/whl/linux/mkl/avx/stable.html
!pip install Pillow==9.5.0
import paddle
paddle.utils.run_check()

In [None]:
# Imports

from operator import index
from paddleocr import PaddleOCR,draw_ocr
from PIL import Image
from PIL.ExifTags import TAGS
from dateutil.parser import parse
from datetime import datetime, timedelta
import easyocr
import time
from google.colab.patches import cv2_imshow
import cv2
import pandas as pd
import pytz
import numpy as np
import os
import math
import sys
import re
import json
from zipfile import ZipFile

# Parameters

DATE_FORMAT = '%m%d'
now = datetime.now(pytz.timezone('Asia/Kuala_Lumpur')) - timedelta(days=1)
date = now.strftime('%Y-%m-%d')

# Function declaration

def is_date(string, relax=False, fuzzy=False, dayfirst=True):
    """
    Return whether the string can be interpreted as a date.

    :param string: str, string to check for date
    :param fuzzy: bool, ignore unknown tokens in string if True
    """
    try:
        if not relax:
          matched = re.search( r'[12][0-9]{3}|3000', string )
          if not matched:
            return False

        rightmost_string = string.split()
        if (len(rightmost_string) == 0):
          return False
        splitted = rightmost_string[-1]
        result = parse(splitted, fuzzy=fuzzy, dayfirst=dayfirst)
        return result

    except (OverflowError, ValueError) as e:
        return False

def show_image(filepath):
  bgray = cv2.imread(filepath)[..., 0]
  # h, w, *rest = img.shape
  # blured1 = cv2.medianBlur(bgray,3)
  # blured2 = cv2.medianBlur(bgray,51)
  # divided = np.ma.divide(blured1, blured2).data
  divided = bgray
  normed = np.uint8(255*divided/divided.max())
  th, threshed = cv2.threshold(divided, 240, 255, cv2.THRESH_BINARY)
  threshed = cv2.erode(threshed, cv2.getStructuringElement(cv2.MORPH_ERODE, (1, 1)))
  # cv2_imshow(threshed)
  cv2_imshow(cv2.imread(filepath, cv2.IMREAD_GRAYSCALE))

def extract_truth(filepath):
  splitted_path = filepath.split(" - ")
  print(filepath)
  if len(splitted_path) > 0:
    [truth, player, *rest] = filepath.split(" - ")
    [date, voter, voter_part, *rest] = truth.split("_")

    return {
      "original_filepath": filepath,
      "date": date,
      "voter": voter,
      "voter_part": voter_part,
      "player": player
    }
  return {
      "original_filepath": 'N/A',
      "date": 'N/A',
      "voter": 'N/A',
      "voter_part": 'N/A',
      "player": 'N/A'
  }

def is_buggy(record):
  return record["voter"] == 'N/A' or record['voter_part'] == 'N/A' or record['date'] == 'N/A'


def extract_voter(reader, filepath, show_img=False):
  img = cv2.imread(filepath, cv2.IMREAD_COLOR)
  h, w, c = img.shape
  text_ = reader.readtext(img[0:round(h/5), 0:w], workers=2)

  for t_, t in enumerate(text_):
    bbox, text, score = t
    text = text.strip()
    if show_img:
      print("Extract voter", text, score)
    if ('forge' in text and 'game' in text):
      return text.split("game")[-1][1:]
  return 'N/A'

def extract_voter_part(reader, filepath, show_img=False, attempt=2, threshold=240):
  img = cv2.imread(filepath, cv2.IMREAD_COLOR)
  img = cv2.erode(img, np.ones((1, 1), np.uint8))
  bgray = cv2.imread(filepath)[..., 0]
  th, threshed = cv2.threshold(bgray, threshold, 255, cv2.THRESH_BINARY)
  threshed = cv2.erode(threshed, cv2.getStructuringElement(cv2.MORPH_ERODE, (1, 1)))

  h, w, *rest = img.shape
  text_ = reader.readtext(threshed[0:round(h/3), 0:round(w/3)], workers=2)
  # text_ = reader.readtext(threshed, workers=2)
  text_.sort(key=lambda x: x[2], reverse=True)
  voter_parts = []

  for t_, t in enumerate(text_):
    bbox, text, score = t
    text = text.strip().replace(" ", "")
    if show_img:
      print("Extract voter part", text, score)
    if ('#' in text):
      voter_part = text.split("#")[1]
      if len(voter_part) >= 1:
        voter_parts.append(voter_part)
        return voter_part
    if (text.startswith('4') or text.startswith('1')):
        voter_part_potential = text[1:]
        if len(voter_part_potential) >= 1:
          voter_parts.append(voter_part_potential)
  voter_part = voter_parts[0] if voter_parts else 'N/A'
  if attempt > 1 and voter_part == 'N/A':
    return extract_voter_part(reader, filepath, show_img, attempt=attempt - 1, threshold=threshold - 10)
  return voter_part

def extract_img_metadata(filepath, show_img=False):
  image = Image.open(filepath)
  exifdata = image.getexif()
  if show_img:
    print(exifdata)
    for tagid in exifdata:

      # getting the tag name instead of tag id
      tagname = TAGS.get(tagid, tagid)

      # passing the tagid to get its respective value
      value = exifdata.get(tagid)

      # printing the final result
      print(f"{tagname:25}: {value}")


def extract_date(reader, filepath, show_img=False, attempt=2):
  img = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE)
  h, w, *rest = img.shape
  restrictions = ["AM", "PM", ":", "."]
  leftovers = []

  text_ = reader.readtext(img[h-50:h, 0:w], workers=2)
  text_.sort(key=lambda x: x[2], reverse=True)

  # text_ = reader.readtext(img, workers=2)

  for t_, t in enumerate(text_):
    bbox, text, score = t
    text = text.strip().split(" ")
    if len(text) > 1:
      leftovers.extend([x.replace(" ", "") for x in text])
      continue
    text = text[0]
    if show_img:
      print("Extract date", text, score)
    text_date = len(text) > 6 and is_date(text, relax=attempt==1)
    if (text_date and not any([char in text for char in restrictions])):
      return text_date.strftime(DATE_FORMAT)

  for text in (leftovers):
    if show_img:
      print("Extract date", text, score)
    text_date = len(text) > 6 and is_date(text, relax=attempt==1)
    if (text_date and not any([char in text for char in restrictions])):
      return text_date.strftime(DATE_FORMAT)


  # extract_img_metadata(filepath, show_img)
  if attempt > 1:
    return extract_date(reader, filepath, show_img, attempt=attempt - 1)
  return 'N/A'

# def extract_date(reader, filepath, show_img=False, attempt=2):
#   img = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE)
#   h, w, *rest = img.shape
#   restrictions = ["AM", "PM", ":", "."]

#   text_ = reader.readtext(img[h-50:h, 0:w], workers=2)
#   text_.sort(key=lambda x: x[2], reverse=True)

#   # text_ = reader.readtext(img, workers=2)

#   for t_, t in enumerate(text_):
#     bbox, text, score = t
#     text = text.strip().split(" ")[-1].replace(" ", "")
#     if show_img:
#       print("Extract date", text, score)
#     text_date = len(text) > 6 and is_date(text, relax=attempt==1)
#     if (text_date and not any([char in text for char in restrictions])):
#     # if (text_date and 'AM' not in text and 'PM' not in text and ':' not in text):
#       return text_date.strftime(DATE_FORMAT)
#   # extract_img_metadata(filepath, show_img)
#   if attempt > 1:
#     return extract_date(reader, filepath, show_img, attempt=attempt - 1)
#   return 'N/A'


# Check if date 2 is of 1 day difference from date 1
def is_date_similar(d1, d2):
  return d1 - timedelta(days=1) <= d2 <= d1 + timedelta(days=1)

def generate_pred(ocr_instances, dir_path, filename, debug=False, show_img=False):
  easyocr_reader = ocr_instances["reader"]
  paddleocr_reader = ocr_instances["ocr"]
  truth = extract_truth(filename) if debug else {}
  full_path = f"{dir_path}/{img_path}"
  if show_img:
    show_image(full_path)

  pred = {
    "filename": filename,
    "voter": "N/A",
    "voter_part": "N/A",
    "date": "N/A",
    "has_voted": False,
    "to_check": True,
    "accuracy": "N/A"
  }

  result = paddleocr_reader.ocr(full_path)
  potential_voter_part = []
  potential_date = []

  for idx in range(len(result)):
    res = result[idx]
    res.sort(key=lambda x: x[1][1], reverse=True)
    for line in res:
        text = line[1][0]
        text = text.strip().replace(" ", "")
        if show_img:
          print(line[1])
        text_date = len(text) > 6 and is_date(text)
        if (text == 'VOTED'):
          pred['has_voted'] = True
        if ('forge' in text and 'game/' in text):
          pred["voter"] = text.split("/")[-1]
        elif (pred['voter_part'] == 'N/A' and len(text) > 1 and text.startswith('#')):
          pred['voter_part'] = text.split("#")[-1]
        elif text_date and 'PM' not in text and 'AM' not in text and ':' not in text:
          pred['date'] = text_date.strftime(DATE_FORMAT)
        elif(pred['voter_part'] == 'N/A' and len(text) > 1 and (text.startswith('1') or text.startswith('4')) and line[0][0][1] < 100):
          potential_voter_part.append([line[0][0], text[1:]])
        elif(is_date(text, relax=True)):
          potential_date.append(is_date(text, relax=True).strftime(DATE_FORMAT))

  if pred['date'] == 'N/A':
    pred['date'] = extract_date(easyocr_reader, full_path, show_img)

  if pred['voter_part'] == 'N/A' or (debug and pred['voter_part'] != truth['voter_part']):
    potential_voter_part.sort(key=lambda x: x[0][1])
    potential_voter_part = potential_voter_part[0][1] if len(potential_voter_part) > 0 else 'N/A'
    extracted_voter_part = extract_voter_part(easyocr_reader, full_path, show_img)
    print(extracted_voter_part)
    pred['voter_part'] = extracted_voter_part if extracted_voter_part != 'N/A' else potential_voter_part

  if pred['voter'] == 'N/A' or (debug and pred['voter'] != truth['voter']):
      pred['voter'] = extract_voter(easyocr_reader, full_path, show_img)

  if pred['date'] == 'N/A':
    pred['date'] = potential_date[0] if len(potential_date) > 0 else 'N/A'

  # if debug:
  #   if (pred['date'] != 'N/A' and pred['date'] != truth['date']):
  #     reparsed_date = is_date(f"{pred['date'][2:4]}-{pred['date'][:2]}-{now.strftime('%Y')}", dayfirst=False)
  #     is_swap = reparsed_date.strftime(DATE_FORMAT) == truth['date'] if reparsed_date else False
  #     if is_swap:
  #       pred['date'] = truth['date']

  pred["voter_score"] = 1 if truth["voter"] == pred["voter"] else 0
  pred["voter_part_score"] = 1 if truth["voter_part"] == pred["voter_part"] else 0
  pred["date_score"] = 1 if pred["date"] != 'N/A' and is_date_similar(parse(f"{truth['date'][2:4]}-{truth['date'][:2]}-{now.strftime('%Y')}"), parse(f"{pred['date'][2:4]}-{pred['date'][:2]}-{now.strftime('%Y')}")) else 0
  pred["accuracy"] = (pred["voter_score"] + pred["voter_part_score"] + pred["date_score"]) / 3
  pred["to_check"] = is_buggy(pred)

  return pred

In [None]:
from zipfile import ZipFile
from datetime import datetime, timedelta
import os
import pytz
import time

now = datetime.now(pytz.timezone('Asia/Kuala_Lumpur')) - timedelta(days=1)
dir_path = f"upload/PC"
selectors = ["filename", "voter", "voter_part", "date", "has_voted", "to_check", "accuracy"]
is_zip_file_exists = os.path.isfile(f"upload/{now.strftime('%d-%m-%Y')}.zip")

if is_zip_file_exists and not os.path.isdir(f"upload/PC"):
  zip_path = f"upload/{now.strftime('%d-%m-%Y')}.zip"
  !unzip $zip_path -d upload/

with ZipFile(f"upload/{now.strftime('%d-%m-%Y')}.zip", mode="r") as zip:
  ori_files = zip.namelist()
  files  =  [x for x in os.listdir(f"{dir_path}") if x.split('.')[-1] not in ['xlsx', 'txt', 'gsheet', 'csv', 'json', 'ipynb_checkpoints']]
  total_files = len(ori_files)
  detected_files = len(files)
  print(f"Files: {detected_files}/{total_files}")
  while total_files != detected_files:
    time.sleep(10)

In [None]:
# dir_path = f"upload/{now.strftime('%d-%m-%Y')}"
dir_path = f"upload/PC"

selectors = ["filename", "voter", "voter_part", "date", "has_voted", "to_check", "accuracy"]

if not os.path.isdir(dir_path):
  print(f"No upload folder: {dir_path}")
  sys.exit(1)

metadata = {
    "faulty": 0,
    "processed_idx": -1,
    "voter_N/A": 0,
    "voter_part_N/A": 0,
    "date_N/A": 0,
    "voter_wrong": 0,
    "voter_part_wrong": 0,
    "date_wrong": 0,
    "total_score": 0,
    "accuracy": 0,
    "total_N/A": 0
}

is_metadata_exists = os.path.isfile(f"{dir_path}/metadata.json")

if is_metadata_exists:
  metadata = json.load(open(f'{dir_path}/metadata.json'))

ocr = PaddleOCR(cls=False, det=False, lang='en', use_gpu=use_gpu) # need to run only once to download and load model into memory
reader = easyocr.Reader(
    ["en"],
    gpu=use_gpu,
    quantize=True
)

start_idx = metadata['processed_idx'] + 1

files  =  [x for x in os.listdir(f"{dir_path}") if x.split('.')[-1] not in ['xlsx', 'txt', 'gsheet', 'csv', 'json', 'ipynb_checkpoints']]
end_idx = len(files)


samples = files
sample_range = [0, len(files)] if start_idx - 1 < 0 else [start_idx, end_idx]
samples = samples[sample_range[0]: sample_range[1]]
samples = [x for x in samples if '_' in x]
samples = [
   "0805_25543_20504 - Ben Joint.jpg"
]
total_len = len(samples)
test = True
debug = False
debug = True


"""""
START
"""""
st = time.time()


for i, img_path in enumerate(samples):
  print(f"{i+start_idx+1}/{total_len + start_idx}: {img_path}")
  img = cv2.imread(f"{dir_path}/{img_path}")
  try:
    record = generate_pred({"reader": reader, "ocr": ocr}, dir_path, img_path, test, debug)

    print("Result: ", record)

    if not debug and record:
      metadata = {
          "faulty": metadata["faulty"],
          "processed_idx": start_idx + i,
          "voter_N/A": metadata["voter_N/A"] + 1 if record['voter'] == 'N/A' else metadata["voter_N/A"],
          "voter_part_N/A": metadata["voter_part_N/A"] + 1 if record['voter_part'] == 'N/A' else metadata["voter_part_N/A"],
          "date_N/A": metadata["date_N/A"] + 1 if record['date'] == 'N/A' else metadata["date_N/A"],
          "voter_wrong": metadata["voter_wrong"] + 1 if record['voter_score'] == 0 else metadata["voter_wrong"],
          "voter_part_wrong": metadata["voter_part_wrong"] + 1 if record['voter_part_score'] == 0 else metadata["voter_part_wrong"],
          "date_wrong": metadata["date_wrong"] + 1 if record['date_score'] == 0 else metadata["date_wrong"],
          "total_score": metadata["total_score"] + record['accuracy']
      }
      metadata["accuracy"] = metadata['total_score'] / (i + start_idx + 1) * 100
      metadata["total_N/A"] = metadata["voter_N/A"] + metadata["voter_part_N/A"] + metadata["date_N/A"]

      df = pd.DataFrame.from_dict([record])
      output_df = df[[*selectors]]
      csv_mode = "a" if start_idx + i  > 0 else 'w'
      output_df.to_csv(f"{dir_path}/path_forge_report_{now.strftime('%d-%m-%Y')}.csv", mode=csv_mode, header=False if start_idx + i > 0 else True, index=False)
      df[df["voter_score"] == 0].to_csv(f"{dir_path}/voter_wrong_{now.strftime('%d-%m-%Y')}.csv", mode=csv_mode, header=False if start_idx + i > 0 else True, index=False)
      df[df["voter_part_score"] == 0].to_csv(f"{dir_path}/voter_part_wrong_{now.strftime('%d-%m-%Y')}.csv", mode=csv_mode, header=False if start_idx + i > 0 else True, index=False)
      df[df["date_score"] == 0].to_csv(f"{dir_path}/date_wrong_{now.strftime('%d-%m-%Y')}.csv", mode=csv_mode, header=False if start_idx + i > 0 else True, index=False)

    with open(f"{dir_path}/metadata.json", "w") as json_file:
      json.dump(metadata, json_file)
    print("Metadata", json.dumps(metadata, indent=4))
  except Exception as e:
    record = {
      "filename": img_path,
      "voter": "N/A",
      "voter_part": "N/A",
      "date": "N/A",
      "has_voted": False,
      "to_check": True,
      "accuracy": "N/A"
    }
    metadata = {
      "faulty": metadata["faulty"] + 1,
      "processed_idx": start_idx + i,
      "voter_N/A": metadata["voter_N/A"],
      "voter_part_N/A": metadata["voter_part_N/A"],
      "date_N/A": metadata["date_N/A"],
      "voter_wrong": metadata["voter_wrong"],
      "voter_part_wrong": metadata["voter_part_wrong"],
      "date_wrong": metadata["date_wrong"],
      "total_score": metadata["total_score"],
      "accuracy": metadata["accuracy"],
      "total_N/A": metadata["total_N/A"]
    }
    df = pd.DataFrame.from_dict([record])
    output_df = df[[*selectors]]
    output_df.to_csv(f"{dir_path}/path_forge_report_{now.strftime('%d-%m-%Y')}.csv", mode=csv_mode, header=False if start_idx + i > 0 else True, index=False)
    with open(f"{dir_path}/metadata.json", "w") as json_file:
      json.dump(metadata, json_file)
    print("Metadata: FAULTY", json.dumps(metadata, indent=4))



if not debug:
  output_df = pd.read_csv(f"{dir_path}/path_forge_report_{now.strftime('%d-%m-%Y')}.csv", dtype=str)
  inaccurate_df = output_df[output_df["accuracy"].astype(float) < 1]
  failed_df = output_df[output_df["to_check"] == True]
  accuracy = math.floor((len(output_df.index) - metadata['total_N/A']) / len(output_df.index) * 100)
  output_df.to_excel(f"{dir_path}/path_forge_report_{now.strftime('%d-%m-%Y')}_{accuracy}%_{metadata['total_N/A']}_{len(inaccurate_df.index)}.xlsx")

"""
END
"""

print(f"Done: {time.time() - st}")



In [None]:
if not debug:
  output_df = pd.read_csv(f"{dir_path}/path_forge_report_{now.strftime('%d-%m-%Y')}.csv", dtype=str)
  failed_df = output_df[output_df["accuracy"].astype(float) < 1]
  accuracy = math.floor((len(output_df.index) - len(failed_df.index)) / len(output_df.index) * 100)
  output_df.to_excel(f"{dir_path}/path_forge_report_{now.strftime('%d-%m-%Y')}_{accuracy}%_{metadata['total_N/A']}_{metadata['voter_wrong'] + metadata['voter_part_wrong'] + metadata['date_wrong']}.xlsx")