<a href="https://colab.research.google.com/github/maxim-chn/deep-learning/blob/maxim/src/preprocessor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Table of Contents


*   [Dependencies management](#dependecies-management)
*   [Project Config Init](#project-config-init)
*   [Connection Setup to Google Cloud Storage](#connection-setup-to-cloud-storage)
*   [Task Definition](#task-definition)
*   [Batch Definition](#batch-definition)
*   [Main Execution Path](#main-execution-path)

## Dependencies Management

In [None]:
# install dependencies

In [None]:
from io import BytesIO
import json
import multiprocessing
import os
from time import sleep, time
import requests

import matplotlib.pyplot as plt
import numpy as np

from google.cloud import storage as storage
from google.colab import auth as auth

from PIL import Image

[Back to Table of Contents](#table-of-contents)

## Project Config Init
We initiate configuration like location of buckets for unpreprocessed and processed images, machine limit in terms of parallel tasks, runtime parameters that define types of tasks etc.

In [None]:
FLOW = "noise_framed" # Possibilities: "classic", "noise_framed"
VERSION = "maxim"
CONFIG_URL = f"https://raw.githubusercontent.com/maxim-chn/deep-learning/{VERSION}/config/project.json"
PREPROCESSED_IMAGES_TARGET = 100000 + 1000
UNPREPROCESSED_IMAGES_LIMIT = PREPROCESSED_IMAGES_TARGET // 2
START_TIME = time()

In [None]:
response = requests.get(CONFIG_URL)
if response.status_code == 200:
  project_config = response.json()
else:
  raise RuntimeError("Failed to download project config")
project_config["machine"]["tasks_number"] = multiprocessing.cpu_count()

In [None]:
BUCKETS = project_config["buckets"]
BUCKETS["preprocessed"] = {
    "name": BUCKETS["preprocessed"]["name"],
    "test": BUCKETS["preprocessed"][FLOW]["test"],
    "train": BUCKETS["preprocessed"][FLOW]["train"]
}
MACHINE = project_config["machine"]
MODEL_INPUT = project_config["model_input"]
PROJECT_ID = project_config["id"]

[Back to Table of Contents](#table-of-contents)

## Connection Setup to Google Cloud Storage
Our goal is to gain access to the *Google Cloud Filesystem*, which is [Google Buckets](https://console.cloud.google.com/storage/browser?hl=en&project=confident-trail-426114-e6) in the case of our project.

The interaction with the Google Cloud Filesystem is managed by the [google-cloud-storage module](https://cloud.google.com/python/docs/reference/storage/latest), initialized with the [google-cloud-storage's Client](https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.client.Client)

In [None]:
auth.authenticate_user()
client = storage.Client(project=PROJECT_ID)

[Back to Table of Contents](#table-of-contents)

## Custom API Definition
This is utility methods and parameters defined specifically for the purpose of preprocessing.

### Image API

In [None]:
class ProcessedImage:
  def __init__(self, counter: int, tr_data: list = [], te_data: bytes = None):
    self._counter = counter
    self._counted_tr = 0
    self._init_name()
    self._init_tr_data(tr_data)
    self._init_te_data(te_data)

  @property
  def name(self) -> str:
    return self._name

  @property
  def te_data(self) -> dict:
    return self._te_data

  @property
  def tr_data(self) -> list:
    return self._tr_data

  def _init_name(self) -> None:
    self._name = f"img_{self._counter}"
    if self._counted_tr > 0:
      self._name = f"{self._name}_tr_{self._counted_tr}"

  def _init_te_data(self, data: bytes = None) -> None:
    self._te_data = None
    if data is None:
      return
    te_obj_name_prefix = f"{self.name}_te"
    self._te_data = {"name": f"{self.name}_te", "data": data}

  def _init_tr_data(self, data: list) -> None:
    self._tr_data = []
    if len(data) == 0:
      return
    for tr_obj_data in data:
      self._tr_data.append({"name": f"{self.name}_tr_{self._counted_tr}", "data": tr_obj_data})
      self._counted_tr += 1

class UnprocessedImage:
  images_counted = 0

  def __init__(self, name: str, data: bytes):
    self._data = data
    self._name = name
    self._update_counter()

  @property
  def counter(self) -> int:
    return self._counter

  @property
  def data(self) -> str:
    return self._data

  @property
  def name(self) -> str:
    dot_idx = self._name.find('.')
    if dot_idx != -1:
      return self._name[:dot_idx]
    return self._name

  def _update_counter(self):
    self._counter = UnprocessedImage.images_counted
    UnprocessedImage.images_counted += 1

In [None]:
def convert_bytes_to_image(data_bytes: bytes) -> Image:
  return Image.open(BytesIO(data_bytes))

def convert_image_to_bytes(image: Image) -> bytes:
  result = BytesIO()
  image.save(result, format="PNG")
  result.seek(0)
  return result.getvalue()

def convert_image_to_grayscale(image: Image) -> Image:
  return image.convert("L")

### BucketOfImages API

In [None]:
def get_batch_of_images(client: storage.Client, num_of_blobs: int) -> list:
  if not hasattr(get_batch_of_images, 'blobs'):
    bucket = client.get_bucket(BUCKETS["unpreprocessed"]["name"])
    get_batch_of_images.blobs = list(bucket.list_blobs())
    get_batch_of_images.num_of_blobs = len(get_batch_of_images.blobs)
    get_batch_of_images.last_read_index = 0
  if get_batch_of_images.last_read_index >= UNPREPROCESSED_IMAGES_LIMIT - 1:
    print(f"Reached unpreprocessed images limit: {UNPREPROCESSED_IMAGES_LIMIT}")
    return []
  result = []
  while num_of_blobs > 0:
    if get_batch_of_images.last_read_index >= get_batch_of_images.num_of_blobs:
      break
    blob = get_batch_of_images.blobs[get_batch_of_images.last_read_index]
    result.append(UnprocessedImage(blob.name, blob.download_as_bytes()))
    get_batch_of_images.last_read_index += 1
    num_of_blobs -= 1
  if get_batch_of_images.last_read_index % MACHINE["std_output_threshold"] == 0:
    print(f"Unpreprocessed image #{get_batch_of_images.last_read_index} was read.")
  return result

def get_batches_of_images(client: storage.Client, num_of_batches: int, num_of_blobs: int) -> list:
  result = []
  while num_of_batches > 0:
    next_batch = get_batch_of_images(client, num_of_blobs)
    if len(next_batch) == 0:
      break
    result.append(next_batch)
    num_of_batches -= 1
  return result

def upload_blob(bucket: storage.Bucket, path: str, data: bytes) -> None:
  blob = bucket.blob(path)
  blob.upload_from_string(data, content_type="image/%s" % MODEL_INPUT["type"])

def persist_processed_image(bucket: storage.Bucket, processed_image: ProcessedImage) -> None:
  if not hasattr(persist_processed_image, 'uploaded_count'):
    persist_processed_image.uploaded_count = 0
    persist_processed_image.std_threshold = persist_processed_image.uploaded_count + MACHINE["std_output_threshold"]
  bucket_tr_name = BUCKETS["preprocessed"]["train"]["name"]
  for tr_obj in processed_image.tr_data:
    upload_blob(bucket, "%s/%s.png" % (bucket_tr_name, tr_obj["name"]), tr_obj["data"])
    persist_processed_image.uploaded_count += 1
  if persist_processed_image.uploaded_count > persist_processed_image.std_threshold:
    print("Image %s was processed. Time elapsed: %s min" % (persist_processed_image.uploaded_count, str((START_TIME - time()) // 60)))
    persist_processed_image.std_threshold += MACHINE["std_output_threshold"]
  if persist_processed_image.uploaded_count > PREPROCESSED_IMAGES_TARGET:
    print(f"{persist_processed_image.uploaded_count} images were processed. Stopping")
    return False
  if not processed_image.te_data is not None:
    return True
  bucket_te_name = BUCKETS["preprocessed"]["test"]["name"]
  upload_blob(bucket, "%s/%s.png" % (bucket_te_name, processed_image.te_data["name"]), processed_image.te_data["data"])
  persist_processed_image.uploaded_count += 1
  return True

def persist_batch_of_images(client: storage.Client, processed_batch: list) -> None:
  bucket = client.bucket(BUCKETS["preprocessed"]["name"])
  for processed_image in processed_batch:
    if not persist_processed_image(bucket, processed_image):
      return False
  return True

### Task API
A `Task` in the context of this Notebook is a piece of logic that performs the following operations on a batch of images:

*   Resize
*   Conversion to Grayscale
*   Crop
*   Rotation

The necessity for `Task` arises from [Global Interpreter Lock (GIL)](https://docs.python.org/3/glossary.html#term-global-interpreter-lock).  
Python doesn't support parallelism as well as other counterparts. Hence, we will create a `Task` for each batch of unpreprocessed images.

Module [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) will execute all of the tasks at the same time, thus bypassing GIL.

Our primary limitation is `bottleneck` operations, such as authentication and client connection to Google Cloud buckets.  
We will omit them from `Task` to avoid potentially complicated issues debugging that are unrelated to the project.  
In return, execution will be more expensive in terms of consumed RAM and time. Which is a tradeoff that doesn't arise any objections for now.

In [None]:
def crop_image_horizontally(image: Image, precise_crop: bool = False) -> list:
  result = []
  width, height = image.size
  width_ratio = width / MODEL_INPUT["width"]
  if width_ratio <= 1:
    return [image]
  limit = int(width_ratio) if precise_crop else int(width_ratio) + 1
  for i in range(limit):
    left_x = i * MODEL_INPUT["width"]
    if left_x >= width:
      break
    right_x = (i + 1) * MODEL_INPUT["width"]
    if right_x > width:
      right_x = width
    result.append(image.crop((left_x, 0, right_x, height)))
  return result

def crop_image_vertically(image: Image, precise_crop: bool = False) -> list:
  result = []
  width, height = image.size
  height_ratio = height / MODEL_INPUT["height"]
  if height_ratio <= 1:
    return [image]
  limit = int(height_ratio) if precise_crop else int(height_ratio) + 1
  for i in range(limit):
    bottom_y = i * MODEL_INPUT["height"]
    if bottom_y >= height:
      break
    upper_y = (i + 1) * MODEL_INPUT["height"]
    if upper_y > height:
      upper_y = height
    result.append(image.crop((0, bottom_y, width, upper_y)))
  return result

def process_image_classic(image: Image, image_counter: int, is_tr: bool) -> ProcessedImage:
  width, height = image.size
  if width < MODEL_INPUT["width"] or height < MODEL_INPUT["height"]:
    return None
  if not is_tr:
    if width > MODEL_INPUT["width"] or height > MODEL_INPUT["height"]:
      image.thumbnail((MODEL_INPUT["width"], MODEL_INPUT["height"]))
    return ProcessedImage(image_counter, te_data=convert_image_to_bytes(image))
  if width > MODEL_INPUT["width"] or height > MODEL_INPUT["height"]:
    cropped_images = []
    for vertically_cropped_image in crop_image_vertically(image, True):
      for cropped_image in crop_image_horizontally(vertically_cropped_image, True):
        cropped_images.append(cropped_image)
      return ProcessedImage(image_counter, tr_data=[convert_image_to_bytes(cropped_image) for cropped_image in cropped_images])

def frame_image_in_noise(image: Image) -> Image:
  width, height = image.size
  noise = np.random.normal(loc=MODEL_INPUT["noise_mean"], scale=MODEL_INPUT["noise_std"], size=(MODEL_INPUT["height"], MODEL_INPUT["width"], MODEL_INPUT["color_channels"])).astype(np.uint8)
  framed_image = Image.fromarray(noise)
  top_left_x = (MODEL_INPUT["width"] - width) // 2
  top_left_y = (MODEL_INPUT["height"] - height) // 2
  framed_image.paste(image, (top_left_x, top_left_y))
  return framed_image

def process_image_noise_framed(image: Image, image_counter: int) -> ProcessedImage:
  width, height = image.size
  if width <= MODEL_INPUT["width"] or height <= MODEL_INPUT["height"]:
    framed_image = frame_image_in_noise(image)
    return ProcessedImage(image_counter, tr_data=[convert_image_to_bytes(framed_image)])
  cropped_images = []
  for vertically_cropped_image in crop_image_vertically(image):
    for cropped_image in crop_image_horizontally(vertically_cropped_image):
      cropped_images.append(cropped_image)
  framed_images = [frame_image_in_noise(cropped_image) for cropped_image in cropped_images]
  return ProcessedImage(image_counter, tr_data=[convert_image_to_bytes(framed_image) for framed_image in framed_images])

def process_image(unprocessed_image: UnprocessedImage) -> ProcessedImage:
  image = convert_bytes_to_image(unprocessed_image.data)
  is_tr = not (unprocessed_image.counter % MODEL_INPUT["te_image_to_tr_images"] == 0)
  if FLOW == "classic":
    return process_image_classic(image, unprocessed_image.counter, is_tr)
  if FLOW == "noise_framed":
    if is_tr:
      return process_image_noise_framed(image, unprocessed_image.counter)
    return process_image_classic(image, unprocessed_image.counter, is_tr)

def preprocessing_task(output_queue: multiprocessing.Queue, batch_of_images: list = []) -> None:
  if output_queue is None or len(batch_of_images) == 0:
    output_queue.put(None)
    return
  results = []
  for unprocessed_image in batch_of_images:
    processed_image = process_image(unprocessed_image)
    if processed_image is None:
      continue
    output_queue.put(processed_image)
  output_queue.put(None)

[Back to Table of Contents](#table-of-contents)

# Main Execution Path
This is where we collect data for the preprocess tasks, run them and store their results.

In [None]:
while True:
  queue = multiprocessing.Queue()
  processes = []
  processed_batch = []
  processes_outputs_left = 0
  images_batches = get_batches_of_images(client, MACHINE["tasks_number"], MACHINE["batch_size"])
  if len(images_batches) == 0:
    print("No more images to preprocess")
    break
  for batch in images_batches:
    processes.append(multiprocessing.Process(target=preprocessing_task, args=(queue, batch)))
  processes_outputs_left = len(processes)
  for process in processes:
    process.start()
  while processes_outputs_left > 0:
    if queue.empty():
      sleep(2)
      continue
    process_output = queue.get()
    if process_output is None:
      processes_outputs_left -= 1
      continue
    elif type(process_output) is ProcessedImage:
      processed_batch.append(process_output)
    else:
      raise RuntimeError("Unexpected type for output of independent process: %s" % type(process_output))
  for process in processes:
    process.terminate()
    process.join()
  if not persist_batch_of_images(client, processed_batch):
    print("No more processed images to persist")
    break

Unpreprocessed image #100 was read.
Image 101 was processed. Time elapsed: -3.0 min
Image 205 was processed. Time elapsed: -4.0 min
Image 317 was processed. Time elapsed: -4.0 min
Image 409 was processed. Time elapsed: -4.0 min
Image 502 was processed. Time elapsed: -5.0 min
Image 606 was processed. Time elapsed: -5.0 min
Image 706 was processed. Time elapsed: -6.0 min
Image 818 was processed. Time elapsed: -6.0 min
Image 903 was processed. Time elapsed: -6.0 min
Image 1008 was processed. Time elapsed: -7.0 min
Image 1110 was processed. Time elapsed: -7.0 min
Image 1202 was processed. Time elapsed: -8.0 min
Image 1308 was processed. Time elapsed: -8.0 min
Image 1407 was processed. Time elapsed: -8.0 min
Image 1517 was processed. Time elapsed: -9.0 min
Image 1606 was processed. Time elapsed: -9.0 min
Image 1711 was processed. Time elapsed: -10.0 min
Image 1811 was processed. Time elapsed: -10.0 min
Unpreprocessed image #200 was read.
Image 1908 was processed. Time elapsed: -12.0 min
Ima

[Back to Table of Contents](#table-of-contents)