This will do mostly the same as the `07-pipeline` but use some tricks to speed things up like multiprocessing as well as making sure memory does not overflow by processing the data in chunks.

In [1]:
import pandas as pd
import numpy as np
import cv2 as cv
import sqlalchemy
from sqlalchemy import create_engine, text
import ipyparallel as ipp

import os
from enum import Enum
from glob import glob
import logging
from typing import List
from itertools import combinations, chain, compress
import time

In [2]:
logging.basicConfig(filename="08-pipeline-multiprocessing.log", level=logging.INFO,
                    format="%(asctime)s %(levelname)-8s %(message)s")

In [3]:
engine_string = "mysql://bukanuser@localhost/bukan?charset=utf8mb4"

In [4]:
def run_sql_query(query: str):
    engine = create_engine(engine_string, convert_unicode=True)
    with engine.connect() as conn:
        results = conn.execute(text(query)).fetchall()
    engine.dispose()
    return results

In [5]:
def log_progress(sequence, every=None, size=None, name='Items'):
    """From <https://github.com/kuk/log-progress>"""
    from ipywidgets import IntProgress, HTML, VBox
    from IPython.display import display

    is_iterator = False
    if size is None:
        try:
            size = len(sequence)
        except TypeError:
            is_iterator = True
    if size is not None:
        if every is None:
            if size <= 200:
                every = 1
            else:
                every = int(size / 200)     # every 0.5%
    else:
        assert every is not None, 'sequence is iterator, set every'

    if is_iterator:
        progress = IntProgress(min=0, max=1, value=1)
        progress.bar_style = 'info'
    else:
        progress = IntProgress(min=0, max=size, value=0)
    label = HTML()
    box = VBox(children=[label, progress])
    display(box)

    index = 0
    try:
        for index, record in enumerate(sequence, 1):
            if index == 1 or index % every == 0:
                if is_iterator:
                    label.value = '{name}: {index} / ?'.format(
                        name=name,
                        index=index
                    )
                else:
                    progress.value = index
                    label.value = u'{name}: {index} / {size}'.format(
                        name=name,
                        index=index,
                        size=size
                    )
            yield record
    except:
        progress.bar_style = 'danger'
        raise
    else:
        progress.bar_style = 'success'
        progress.value = index
        label.value = "{name}: {index}".format(
            name=name,
            index=str(index or '?')
        )

In [6]:
overview = pd.read_csv("bukan-overview-final.csv", index_col=0)

## Processing Images

What I need to do now per image is:

1. Read, greyscale and crop all images
2. Split right/left page if necessary
3. Extract features

In [7]:
class Page(Enum):
    """Japanese reading order is from right to left."""
    whole = 0
    right = 1
    left  = 2

In [8]:
def crop_image(img):
    target_height = 660
    target_width = 990
    height, width = img.shape
    x1 = (width - target_width) // 2
    y1 = (height - target_height) // 2
    x2 = x1 + target_width
    y2 = y1 + target_height
    return img[y1:y2, x1:x2]

In [9]:
def read_image(path):
    img = cv.imread(path, flags=cv.IMREAD_REDUCED_GRAYSCALE_4)
    img = crop_image(img)
    return img

In [10]:
def split_image(img):
    height, width = img.shape
    assert width == 990
    half_width = width // 2
    return img[:, :half_width], img[:, half_width:]

In [11]:
def extract_page_nr_from_path(path):
    return int(path[-9:-4])

In [12]:
def write_image(image: np.ndarray, book_id: int, page_nr: int, page_enum: Page):
    path = f"output/images/{book_id}/{book_id}_{page_nr:0>5}_{page_enum.value}.jpg"
    assert cv.imwrite(path, image, [cv.IMWRITE_JPEG_QUALITY, 80, cv.IMWRITE_JPEG_OPTIMIZE, True])
    logging.info(f"Image written: {path}")

In [13]:
def descriptors_to_dataframe(descriptors: np.ndarray, book_id: int, page_nr: int, page_enum: Page):
    df = pd.DataFrame(descriptors)
    df.index = pd.MultiIndex.from_product([[book_id], [page_nr], [page_enum.value], df.index],
                                          names=["book", "page", "lr", "feature"])
    return df

In [14]:
def keypoints_to_dataframe(keypoints: List[cv.KeyPoint], book_id: int, page_nr: int, page_enum: Page):
    df = pd.DataFrame([(kp.pt[0], kp.pt[1], kp.size, kp.angle, kp.response, kp.octave, kp.class_id) for kp in keypoints],
                      columns=["x", "y", "size", "angle", "response", "octave", "class_id"])
    df.index = pd.MultiIndex.from_product([[book_id], [page_nr], [page_enum.value], df.index],
                                          names=["book", "page", "lr", "feature"])
    return df

In [15]:
def detect_features(image: np.ndarray, book_id: int, page_nr: int, page_enum: Page,
                    engine: sqlalchemy.engine.Engine, detector: cv.Feature2D):
    keypoints, descriptors = detector.detectAndCompute(image, None)
    if descriptors is None:
        logging.warning(f"No features detected for: {book_id}/{page_nr}/{page_enum.name}")
        return
    descriptors = descriptors_to_dataframe(descriptors, book_id, page_nr, page_enum)
    descriptors.to_sql("descriptor", engine, if_exists="append")
    logging.info(f"Descriptors written to database for: {book_id}/{page_nr}/{page_enum.name}")
    keypoints = keypoints_to_dataframe(keypoints, book_id, page_nr, page_enum)
    keypoints.to_sql("keypoint", engine, if_exists="append")
    logging.info(f"Keypoints written to database for: {book_id}/{page_nr}/{page_enum.name}")

In [16]:
def process_path(path, book_id: int, nr_pages_per_image: int, engine: sqlalchemy.engine.Engine,
                 detector: cv.Feature2D):
    page_nr = extract_page_nr_from_path(path)
    image = read_image(path)
    if nr_pages_per_image == 1:
        write_image(image, book_id, page_nr, Page.whole)
        detect_features(image, book_id, page_nr, Page.whole, engine, detector)
    elif nr_pages_per_image == 2:
        left_image, right_image = split_image(image)
        write_image(right_image, book_id, page_nr, Page.right)
        detect_features(right_image, book_id, page_nr, Page.right, engine, detector)
        write_image(left_image, book_id, page_nr, Page.left)
        detect_features(left_image, book_id, page_nr, Page.left, engine, detector)
    else:
        logging.warning(f"Strange number of pages per image for {path}: {nr_pages_per_image} (Skipping)")

In [17]:
def save_preprocessed_images_and_features(overview_df: pd.DataFrame, engine: sqlalchemy.engine.Engine,
                                          detector: cv.Feature2D):
    try:
        for book_id, book_metadata in log_progress(overview_df.iterrows(), every=1, size=len(overview_df), name="Rows"):
            os.makedirs(f"output/images/{str(book_id)}", exist_ok=True)
            nr_images = book_metadata["NrImages"]
            nr_pages_per_image = book_metadata["NrPages"]
            image_paths = glob(f"data/{book_id}/image/*.jpg")
            assert len(image_paths) == nr_images
            image_paths.sort()
            for path in image_paths:
                process_path(path, book_id, nr_pages_per_image, engine, detector)
    except Exception as e:
        logging.critical(str(e))
        raise e

In [18]:
#engine = create_engine(engine_string)
#akaze = cv.AKAZE_create(cv.AKAZE_DESCRIPTOR_MLDB_UPRIGHT, descriptor_size=0, threshold=0.005)
#start_time = time.monotonic()
#save_preprocessed_images_and_features(remaining, engine, akaze)
#stop_time = time.monotonic()
#engine.dispose()
#print("All of this took:", stop_time - start_time, "seconds.")

## Processing Feature Pairs

First, I need to get all all book combinations as well as a fixed page offset. For each combination I need to run the full pipeline:

1. Find matching features
2. Filter features by their position
3. Compute the homography
4. Select features using the homography mask
4. **Don't threshold the features**
5. Save them to disk

In [19]:
engine = create_engine(engine_string)
page_sql = pd.read_sql("page", engine, index_col="id")
engine.dispose()

In [20]:
def get_matched_page_ids(page_df, overview, radius=8):
    page_title = page_df.set_index("book", append=True).swaplevel()
    page_id_tuples_complete = []
    for title in overview["書名（統一書名）"].unique():
        subset = overview[overview["書名（統一書名）"] == title]
        for book1_id, book2_id in combinations(subset.sort_values("NrImages", ascending=False).index, 2):
            book1_pages = page_title.loc[book1_id]
            book2_pages = page_title.loc[book2_id]
            book2_pages_invdict = {(page, lr):page_id for page_id, (page, lr) in book2_pages.iterrows()}
            for page_id, (page, lr) in book1_pages.iterrows():
                page_id_tuples= [(page_id, book2_pages_invdict[(page2, lr)])
                                 for page2 in range(page-radius, page+radius+1)
                                 if (page2, lr) in book2_pages_invdict]
                page_id_tuples_complete.extend(page_id_tuples)
    return pd.DataFrame(page_id_tuples_complete,
                        index=pd.RangeIndex(1, len(page_id_tuples_complete)+1),
                        columns=["page1", "page2"])

In [21]:
if os.path.exists("page_id.csv"):
    page_id_combinations_df = pd.read_csv("page_id.csv", index_col=0)
else:
    page_id_combinations_df = get_matched_page_ids(page_sql, overview)

  mask |= (ar1 == a)


In [22]:
def process_match_id_batch(page_id_df, descriptors: dict, keypoints: dict):
    matcher = cv.BFMatcher_create(normType=cv.NORM_HAMMING)
    max_match_distance = 100
    wradius = 100.
    hradius = 100.
    def is_near(points_pair):
        (x1, y1), (x2, y2) = points_pair
        return ((x1 - wradius) <= x2 <= (x1 + wradius)) and ((y1 - hradius) <= y2 <= (y1 + hradius))
    def process_match_ids(page_id_pair):
        left, right = page_id_pair
        # 1. find_matches
        left_desc = descriptors[left]
        right_desc = descriptors[right]
        matches = matcher.radiusMatch(left_desc, right_desc, max_match_distance)
        matches = list(chain.from_iterable(matches))
        if len(matches) == 0:
            return None
        # 2. select_keypoints
        left_kps = keypoints[left]
        right_kps = keypoints[right]
        left_points = np.empty((len(matches), 2), dtype=np.float32)
        right_points = np.empty((len(matches), 2), dtype=np.float32)
        for i, match in enumerate(matches):
            left_points[i, :] = left_kps.loc[match.queryIdx].values
            right_points[i, :] = right_kps.loc[match.trainIdx].values
        zipped_points = zip(left_points, right_points)
        points_mask = np.fromiter(map(is_near, zipped_points), dtype=np.bool, count=len(matches))
        left_points = left_points[points_mask]
        right_points = right_points[points_mask]
        assert left_points.shape == right_points.shape
        if left_points.size == 0:
            return None
        # 3. compute_homography
        h, h_mask = cv.findHomography(left_points, right_points, cv.RANSAC)
        h_mask_sum = h_mask.sum()
        if not h_mask_sum > 0:
            return None
        # 4. filter_bad_homographies
        is_bad_h = np.any(np.isnan(h)) or np.any(np.absolute(h[2,:2]) > 0.001)
        if is_bad_h:
            return None
        # 5. chose_relevant_matches
        h_mask = np.squeeze(h_mask).astype(np.bool)
        j = 0
        for i in range(len(points_mask)):
            if points_mask[i]:
                if not h_mask[j]:
                    points_mask[i] = False
                j += 1
        assert points_mask.sum() == h_mask.sum()
        relevant_matches = compress(matches, points_mask)
        relevant_pairs = ((match.queryIdx, match.trainIdx) for match in relevant_matches)
        relevant_left, relevant_right = zip(*relevant_pairs)
        return (relevant_left, relevant_right)
    features = page_id_df.apply(process_match_ids, axis=1)
    features = features.dropna()
    result_df_list = []
    for index, feature in features.items():
        src_page_id, dst_page_id = page_id_df.loc[index]
        src_page_ids = np.full(len(feature[0]), src_page_id, dtype=np.uint32)
        src_features = np.array(feature[0], dtype=np.uint32)
        dst_page_ids = np.full(len(feature[1]), dst_page_id, dtype=np.uint32)
        dst_features = np.array(feature[1], dtype=np.uint32)
        result_df_list.append(
            pd.DataFrame([src_page_ids, src_features, dst_page_ids, dst_features], dtype=np.uint32,
                         index=["src_page_id", "src_feature", "dst_page_id", "dst_feature"]).T
        )
    return pd.concat(result_df_list)

In [23]:
def process_everything(chunksize, skip_multiplyer=0):
    engine = create_engine(engine_string)
    total_size = len(page_id_combinations_df)
    for chunk_i in log_progress(
            range(skip_multiplyer*chunksize, total_size, chunksize),
            every=1, size=(total_size//chunksize)-skip_multiplyer):
        page_chunk = page_id_combinations_df.iloc[chunk_i:chunk_i+chunksize]
        page_ids = set(page_chunk.unstack())
        try:
            # VERY UNSAFE
            descriptors = pd.DataFrame(
                run_sql_query("SELECT * FROM descriptor WHERE page_id IN (%s)" % ",".join(map(str, page_ids))),
                columns=["page_id", "feature", *range(61)]
            ).set_index(["page_id", "feature"]).astype(np.uint8)
            descriptors = {page_id:descriptors.loc[page_id].values for page_id in descriptors.index.get_level_values("page_id").unique()}
            # ALSO UNSAFE
            keypoints = pd.DataFrame(
                run_sql_query("SELECT page_id, feature, x, y FROM keypoint WHERE page_id IN (%s)" % ",".join(map(str, page_ids))),
                columns=["page_id", "feature", "x", "y"]
            ).set_index(["page_id", "feature"]).astype(np.float32)
            keypoints = {page_id:keypoints.loc[page_id] for page_id in keypoints.index.get_level_values("page_id").unique()}
            chunk_results = process_match_id_batch(page_chunk, descriptors, keypoints)
            chunk_results.to_sql("fmatch", engine, if_exists="append", index=False)
        except Exception as e:
            logging.critical(f"Failure processing: {page_chunk.index}")
            logging.critical(str(e))
            raise e
        logging.info(f"Successfully processed: {page_chunk.index}")