In [1]:
# ! pip install anvil-uplink

In [None]:
import os
import json
import cv2
from uuid import uuid4
from ultralytics import YOLO
import anvil.server
import anvil.media

import tensorflow as tf
import keras
from keras import ops
from keras import layers
from pathlib import Path
import numpy as np

In [None]:
temp_dir = 'temp'
os.makedirs(temp_dir, exist_ok=True)

In [None]:
def ctc_batch_cost(y_true, y_pred, input_length, label_length):
    label_length = ops.cast(ops.squeeze(label_length, axis=-1), dtype="int32")
    input_length = ops.cast(ops.squeeze(input_length, axis=-1), dtype="int32")
    sparse_labels = ops.cast(
        ctc_label_dense_to_sparse(y_true, label_length), dtype="int32"
    )

    y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon())

    return ops.expand_dims(
        tf.compat.v1.nn.ctc_loss(
            inputs=y_pred, labels=sparse_labels, sequence_length=input_length
        ),
        1,
    )


def ctc_label_dense_to_sparse(labels, label_lengths):
    label_shape = ops.shape(labels)
    num_batches_tns = ops.stack([label_shape[0]])
    max_num_labels_tns = ops.stack([label_shape[1]])

    def range_less_than(old_input, current_input):
        return ops.expand_dims(ops.arange(ops.shape(old_input)[1]), 0) < tf.fill(
            max_num_labels_tns, current_input
        )

    init = ops.cast(tf.fill([1, label_shape[1]], 0), dtype="bool")
    dense_mask = tf.compat.v1.scan(
        range_less_than, label_lengths, initializer=init, parallel_iterations=1
    )
    dense_mask = dense_mask[:, 0, :]

    label_array = ops.reshape(
        ops.tile(ops.arange(0, label_shape[1]), num_batches_tns), label_shape
    )
    label_ind = tf.compat.v1.boolean_mask(label_array, dense_mask)

    batch_array = ops.transpose(
        ops.reshape(
            ops.tile(ops.arange(0, label_shape[0]), max_num_labels_tns),
            tf.reverse(label_shape, [0]),
        )
    )
    batch_ind = tf.compat.v1.boolean_mask(batch_array, dense_mask)
    indices = ops.transpose(
        ops.reshape(ops.concatenate([batch_ind, label_ind], axis=0), [2, -1])
    )

    vals_sparse = tf.compat.v1.gather_nd(labels, indices)

    return tf.SparseTensor(
        ops.cast(indices, dtype="int64"),
        vals_sparse,
        ops.cast(label_shape, dtype="int64"),
    )


class CTCLayer(layers.Layer):
    def __init__(self, name=None, **kwargs):
        super().__init__(name=name)
        self.loss_fn = ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        batch_len = ops.cast(ops.shape(y_true)[0], dtype="int64")
        input_length = ops.cast(ops.shape(y_pred)[1], dtype="int64")
        label_length = ops.cast(ops.shape(y_true)[1], dtype="int64")

        input_length = input_length * ops.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * ops.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred

img_width = 500
img_height = 50

max_length = 10

def ctc_decode(y_pred, input_length, greedy=True, beam_width=100, top_paths=1):
    input_shape = ops.shape(y_pred)
    num_samples, num_steps = input_shape[0], input_shape[1]
    y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon())
    input_length = ops.cast(input_length, dtype="int32")

    if greedy:
        (decoded, log_prob) = tf.nn.ctc_greedy_decoder(
            inputs=y_pred, sequence_length=input_length
        )
    else:
        (decoded, log_prob) = tf.compat.v1.nn.ctc_beam_search_decoder(
            inputs=y_pred,
            sequence_length=input_length,
            beam_width=beam_width,
            top_paths=top_paths,
        )
    decoded_dense = []
    for st in decoded:
        st = tf.SparseTensor(st.indices, st.values, (num_samples, num_steps))
        decoded_dense.append(tf.sparse.to_dense(sp_input=st, default_value=-1))
    return (decoded_dense, log_prob)

# A utility function to decode the output of the network
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
        :, :max_length
    ]
    # Iterate over the results and get back the text
    output_text = []
    for res in results:
        res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
        output_text.append(res)
    return output_text



data_dir = Path('../data/processed/sudoku_ocr_dataset/')

# Get list of all the images
images = sorted(list(map(str, list(data_dir.glob("*.jpg")))))
labels = [img.split(os.path.sep)[-1].split(".jpg")[0].split('_')[0] for img in images]
characters = set(char for label in labels for char in label)
characters = sorted(list(characters))

max_length = max([len(label) for label in labels])

# Mapping characters to integers
char_to_num = layers.StringLookup(vocabulary=list(characters), mask_token=None)

# Mapping integers back to original characters
num_to_char = layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)

In [None]:
localization_model = YOLO(r"C:\Users\shbnd\Desktop\Work\sudoku-solver\models\localization_yolo_model\weights\best.pt")
model = keras.models.load_model('../models/sudoku_ocr.keras', custom_objects={'CTCLayer': CTCLayer})
prediction_model = keras.models.Model(
    model.input[0], model.get_layer(name="dense2").output
)

In [None]:
anvil.server.connect("BTYQVFMCXQHW2WSJLIZTKPBI-JYI3TDO2QSP3VVRU")

In [None]:
@anvil.server.callable
def download_image(file):
    uuid_string = str(uuid4())
    request_dir = os.path.join(temp_dir, uuid_string)
    os.makedirs(request_dir, exist_ok=True)
    image_path = os.path.join(request_dir, file.name)
    fh = open(image_path, 'wb')
    fh.write(file.get_bytes())
    fh.close()

    result = localization_model(image_path)
    result_json = json.loads(result[0].to_json())
    bbox_dict = result_json[0]['box']
    
    image = cv2.imread(image_path)
    image = image[int(bbox_dict['y1']): int(bbox_dict['y2']), int(bbox_dict['x1']): int(bbox_dict['x2'])]
    
    H, W, _ = image.shape
    
    h = int(H/9)
    w = int(W/9)

    result = {}
    for idx in range(9):
        file_path = os.path.join(request_dir, f'image_{idx}.jpg')
        row_image = image[idx*h: (idx+1)*h, :]
        cv2.imwrite(file_path, row_image)
        result[f'image_{idx+1}'] = anvil.media.from_file(file_path)
        
        img = tf.io.read_file(file_path)
        img = tf.io.decode_png(img, channels=1)
        img = tf.image.convert_image_dtype(img, tf.float32)
        img = ops.image.resize(img, [img_height, img_width])
        img = ops.transpose(img, axes=[1, 0, 2])
        img = tf.expand_dims(img, 0)
        preds = prediction_model.predict(img)
        pred_texts = decode_batch_predictions(preds)
        result[f'result_{idx+1}'] = pred_texts[0]
        
    return result

In [8]:
# anvil.server.wait_forever()