In [1]:
%load_ext autoreload
%autoreload 2
%load_ext nb_black
%load_ext dotenv
%dotenv
from IPython.core.interactiveshell import InteractiveShell
from IPython.display import Image, display

InteractiveShell.ast_node_interactivity = "all"

<IPython.core.display.Javascript object>

In [2]:
import tempfile
import shutil
import glob
import os

# import cv2
import av
import PIL

from tqdm.notebook import tqdm

import numpy as np
from matplotlib import pyplot as plt

# import plotnine as p9
import pandas as pd

import libem

%matplotlib inline


def show_im(p):
    display(Image(filename=p))

<IPython.core.display.Javascript object>

In [3]:
from segment_anything import sam_model_registry, SamAutomaticMaskGenerator, SamPredictor

sam_checkpoint = "/home/mike/Downloads/sam_vit_h_4b8939.pth"
model_type = "vit_h"

device = "cuda"

sam = sam_model_registry[model_type](checkpoint=sam_checkpoint)
_ = sam.to(device=device)


mask_generator = SamAutomaticMaskGenerator(
    model=sam,
    # points_per_side=128,
    # points_per_batch=32,
    # pred_iou_thresh=0.86,
    # stability_score_thresh=0.92,
    # crop_n_layers=1,
    # crop_n_points_downscale_factor=2,
    # min_mask_region_area=20,  # Requires open-cv to run post-processing
)


<IPython.core.display.Javascript object>

In [4]:
config = {
    "SPEED": dict(
        sel=(-0.3, 1.2, 1.2, 3.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 30, "min-char-dims": "5x50"},
            [],
        ),
    ),
    "WATTS": dict(
        sel=(-0.3, 1.2, 0.8, 3.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 30, "min-char-dims": "5x50"},
            [],
        ),
    ),
    "CADENCE": dict(
        sel=(0, 1.2, 0.20, 3.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 30, "min-char-dims": "5x50"},
            [],
        ),
    ),
    "DISTANCE": dict(
        sel=(0, 1.2, 0.7, 3.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 30, "min-char-dims": "1x50"},
            [],
        ),
    ),
    "TIME": dict(
        sel=(-1.2, 1.2, 1.45, 3.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 30, "min-char-dims": "5x50"},
            [],
        ),
    ),
    "CALORIES": dict(
        sel=(0, 1.2, 0.2, 3.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 30, "min-char-dims": "5x50"},
            [],
        ),
    ),
}


def initial_ocr_and_crop(img):
    fdf = libem.locate_display(np.asarray(img))
    rec = libem.crop_to_numbers(fdf)

    crop_left, crop_right, crop_top, crop_bottom = (
        rec["left"],
        rec["right"],
        rec["top"],
        rec["bottom"],
    )
    crop_width = abs(crop_left - crop_right)
    crop_height = abs(crop_top - crop_bottom)
    crop_left -= int(round(crop_width * 0.1))
    crop_right += int(round(crop_width * 0.1))
    crop_bottom += int(round(crop_height * 0.5))
    cropped_img = img.crop((crop_left, crop_top, crop_right, crop_bottom))
    resize_height = int(cropped_img.height * (rec["resize_width"] / cropped_img.width))
    resized_img = cropped_img.resize(
        (rec["resize_width"], resize_height), PIL.Image.LANCZOS
    )
    rotated_img = resized_img.rotate(rec["angle"], PIL.Image.BICUBIC)
    # rotated_img

    # matches = libem.locate_numbers(
    #     phase1_results_df.query("frame == @base").assign(
    #         bbox=lambda f: f.bbox.apply(libem.make_polygons_2d)
    #     )
    # )

    ocr = libem.get_ocr()
    ocr_df = libem.paddle_results_to_df(ocr.ocr(np.asarray(rotated_img), cls=True)[0])

    return ocr_df, rotated_img


def show_anns_df(anns):
    if len(anns) == 0:
        return
    anns = anns.sort_values("area", ascending=False)

    first_mask = anns.head(1).squeeze()

    img = np.zeros_like(first_mask.segmentation).astype(np.bool_)
    for ann in anns.itertuples():
        img |= ann.segmentation

    img = ~img

    return PIL.Image.fromarray(img)


def get_text(ppocr_result):
    try:
        return ppocr_result[0][0][-1][0]
    except IndexError:
        return None


def second_round(ocr_df, img):
    #     matches = libem.locate_numbers(
    #         ocr_df.assign(bbox=lambda f: f.bbox.apply(libem.make_polygons_2d))
    #     )
    ocr = libem.get_ocr()
    records = []
    #     print(f"{matches=}")
    for landmark in config:
        landmark

        cfg = config[landmark]

        # im = PIL.Image.open(base)
        # base = os.path.basename(frame)
        bbox = ocr_df.query("inference_text == @landmark").bbox.squeeze()

        try:
            im, out = libem.ssocr_subimage(img, bbox, cfg["sel"], cfg["ssocr_conf"])
        except:
            record = dict(
                landmark=landmark,
                pp1="",
                ss1="",
                pp2="",
                ss2="",
            )
            records.append(record)
            continue

        ssocr1 = out
        ppocr1 = get_text(ocr.ocr(np.asarray(im), cls=True))

        _df = pd.DataFrame(mask_generator.generate(np.asarray(im)))
        _df[["left", "top", "width", "height"]] = _df["bbox"].apply(pd.Series)
        _df = _df.query("area.between(100, 2000)").query("width < 100")

        im_sam = show_anns_df(_df).convert("RGB")

        ssocr2 = libem.run_ssocr(np.asarray(im_sam), *cfg["ssocr_conf"])
        ppocr2 = get_text(ocr.ocr(np.asarray(im_sam), cls=True))

        record = dict(
            landmark=landmark,
            pp1=ppocr1,
            ss1=ssocr1,
            pp2=ppocr2,
            ss2=ssocr2,
        )
        records.append(record)

    return pd.DataFrame(records)

<IPython.core.display.Javascript object>

In [None]:
ctr = av.open("/bucket/exercise-machina/IMG_1392.MOV")

frames = []

# lo, hi = 2500, 3500
lo, hi = 0, 12224
# Create a tqdm object
progress_bar = tqdm(total=(hi - lo), desc="Processing frames")

for ix, frame in enumerate(ctr.decode(video=0)):
    if ix < lo:
        continue
    if ix >= hi:
        break

    image = frame.to_image()

    ocr_df, sub_image = initial_ocr_and_crop(image)
    ocr2_df = second_round(ocr_df, sub_image).assign(frame=ix)

    _ = progress_bar.update(1)  # Update the progress bar by one iteration
    frames.append(ocr2_df)

    if len(frames) == 100:
        frames = pd.concat(frames)
        frames.to_csv(f"frames.{ix:06d}.csv", index=False)
        print(f"Wrote output after index: [{ix}]")
        frames = []

progress_bar.close()  # Close the progress bar at the end of the loop

Processing frames:   0%|          | 0/12224 [00:00<?, ?it/s]

Wrote output after index: [99]
Wrote output after index: [199]
Wrote output after index: [299]
Wrote output after index: [399]
Wrote output after index: [499]
Wrote output after index: [599]




Wrote output after index: [699]
Wrote output after index: [799]




Wrote output after index: [899]
Wrote output after index: [999]
Wrote output after index: [1099]
Wrote output after index: [1199]
Wrote output after index: [1299]
Wrote output after index: [1399]
Wrote output after index: [1499]
Wrote output after index: [1599]
Wrote output after index: [1699]
Wrote output after index: [1799]
Wrote output after index: [1899]
Wrote output after index: [1999]
Wrote output after index: [2099]
Wrote output after index: [2199]
Wrote output after index: [2299]
Wrote output after index: [2399]
Wrote output after index: [2499]




Wrote output after index: [2599]
Wrote output after index: [2699]
Wrote output after index: [2799]
Wrote output after index: [2899]
Wrote output after index: [2999]
Wrote output after index: [3099]
Wrote output after index: [3199]
Wrote output after index: [3299]
Wrote output after index: [3399]
Wrote output after index: [3499]
Wrote output after index: [3599]
Wrote output after index: [3699]
Wrote output after index: [3799]
Wrote output after index: [3899]
Wrote output after index: [3999]
Wrote output after index: [4099]
Wrote output after index: [4199]
Wrote output after index: [4299]
Wrote output after index: [4399]
Wrote output after index: [4499]
Wrote output after index: [4599]
Wrote output after index: [4699]
Wrote output after index: [4799]


In [None]:
ocr_df, sub_image = initial_ocr_and_crop(image)
ocr_df
sub_image
ocr2_df = second_round(ocr_df, sub_image).assign(frame=ix)

In [None]:
ocr2_df

In [None]:
frames_df = pd.concat(frames)

In [None]:
frames_df.query("landmark == 'SPEED'")

In [None]:
fdf = libem.locate_display(frame.to_ndarray())
rec = libem.crop_to_numbers(fdf)


crop_left, crop_right, crop_top, crop_bottom = (
    rec["left"],
    rec["right"],
    rec["top"],
    rec["bottom"],
)
crop_width = abs(crop_left - crop_right)
crop_height = abs(crop_top - crop_bottom)
crop_left -= int(round(crop_width * 0.1))
crop_right += int(round(crop_width * 0.1))
crop_bottom += int(round(crop_height * 0.5))
cropped_img = img.crop((crop_left, crop_top, crop_right, crop_bottom))
resize_height = int(cropped_img.height * (rec["resize_width"] / cropped_img.width))
resized_img = cropped_img.resize(
    (rec["resize_width"], resize_height), PIL.Image.LANCZOS
)
rotated_img = resized_img.rotate(rec["angle"], PIL.Image.BICUBIC)
rotated_img


# matches = libem.locate_numbers(
#     phase1_results_df.query("frame == @base").assign(
#         bbox=lambda f: f.bbox.apply(libem.make_polygons_2d)
#     )
# )

ocr = libem.get_ocr()
ocr_df = libem.paddle_results_to_df(ocr.ocr(np.asarray(rotated_img), cls=True)[0])

# sam_masks = mask_generator.generate(np.asarray(rotated_img))
# sam_df = pd.DataFrame(sam_masks)

ocr_df
# sam_df

In [None]:
def show_anns_df(anns, cmask=[1, 1, 1], alpha=0.8):
    if len(anns) == 0:
        return
    anns = anns.sort_values("area", ascending=False)

    first_mask = anns.head(1).squeeze()

    img = np.zeros_like(first_mask.segmentation).astype(np.bool_)
    for ann in anns.itertuples():
        # m = ann.segmentation[:, :, 0]
        # color_mask = np.concatenate([np.random.random(3), [0.35]])
        # color_mask = cmask + [alpha]
        img |= ann.segmentation

    img = ~img

    return PIL.Image.fromarray(img)


show_anns_df(_df)

In [None]:
sbb = ocr_df.query("inference_text == 'CALORIES'").bbox.squeeze()


def convert_bbox_to_xywh(bbox):
    # bbox format is [[x1, y1], [x2, y2], [x3, y3], [x4, y4]]
    x_coords = [coord[0] for coord in bbox]
    y_coords = [coord[1] for coord in bbox]
    x = min(x_coords)
    y = min(y_coords)
    w = max(x_coords) - x
    h = max(y_coords) - y
    return [x, y, w, h]


sxy = convert_bbox_to_xywh(sbb)


def p_overlap(bbox, ref_bbox=sxy):
    # Unpack the bounding box coordinates
    x1, y1, w1, h1 = bbox
    x2, y2, w2, h2 = ref_bbox

    # Calculate the (x, y)-coordinates of the intersection rectangle
    x_left = max(x1, x2)
    y_top = max(y1, y2)
    x_right = min(x1 + w1, x2 + w2)
    y_bottom = min(y1 + h1, y2 + h2)

    if x_right < x_left or y_bottom < y_top:
        return 0.0  # No overlap

    # Compute the area of intersection rectangle
    intersection_area = (x_right - x_left) * (y_bottom - y_top)

    # Compute the area of bbox
    bbox_area = w1 * h1

    # Compute the proportion of the intersection over bbox
    overlap = intersection_area / float(bbox_area)

    return overlap


# _df = sam_df.assign(po=lambda f: f.bbox.apply(p_overlap)).sort_values(
#     "po", ascending=False
# )
# _df[["left", "top", "width", "heigth"]] = _df["bbox"].apply(pd.Series)


# _mdf = _df.query("po > 0.25").query("area < 1500")

# _ddf = (
#     _df.query("top > 350").query("left > 750").query("width < 100")
# )  # .query("area < 3000")

# plt.figure(figsize=(20, 20))
# plt.imshow(np.asarray(rotated_img))
# show_anns_df(_mdf)
# show_anns_df(_ddf, [1, 1, 0])
# plt.axis("off")
# plt.show()

In [None]:
_df.query("top > 350").query("left > 750").query()

In [None]:
all_frames = glob.glob(f"{wd}/*.png")
phase1_results_df = libem.ocr_frames(all_frames)

In [None]:
ocr_df
matches = libem.locate_numbers(
    ocr_df.assign(bbox=lambda f: f.bbox.apply(libem.make_polygons_2d))
)
matches

for landmark in config:
    landmark

    cfg = config[landmark]

    # im = PIL.Image.open(base)
    # base = os.path.basename(frame)
    bbox = matches.query("landmark == @landmark").landmark_bbox.squeeze()
    # bbox

    im, out = libem.ssocr_subimage(rotated_img, bbox, cfg["sel"], cfg["ssocr_conf"])

    im
    ocr.ocr(np.asarray(im), cls=True)[-1]

    _df = pd.DataFrame(mask_generator.generate(np.asarray(im)))
    _df[["left", "top", "width", "height"]] = _df["bbox"].apply(pd.Series)
    # _df = _df.assign(max_asp=lambda f: max(f.width/f.height, f.height/f.width))
    _df = _df.query("area.between(100, 2000)").query("width < 100")
    #     .loc[
    #         lambda f: (f.width.between(5, 15) & f.height.between(45, 55))
    #         | (f.height.between(5, 15) & f.width.between(45, 55))
    #     ]
    _df
    im_sam = show_anns_df(_df, [0, 0, 0]).convert("RGB")
    im_sam

    np.asarray(im_sam).shape

    ocr.ocr(np.asarray(im_sam), cls=True)
    libem.run_ssocr(np.asarray(im_sam), *cfg["ssocr_conf"])
# im.save(f"test-{landmark}.png")
# out
# 1 / 0

In [None]:
pics = []
for frame in phase1_results_df.frame.unique()[:10]:
    _df = phase1_results_df.query("frame == @frame")
    _df
    rec = libem.crop_to_numbers(_df)
    img = PIL.Image.open(frame)

    crop_left, crop_right, crop_top, crop_bottom = (
        rec["left"],
        rec["right"],
        rec["top"],
        rec["bottom"],
    )
    crop_width = abs(crop_left - crop_right)
    crop_height = abs(crop_top - crop_bottom)
    crop_right += int(round(crop_width * 0.1))
    crop_bottom += int(round(crop_height * 0.5))
    cropped_img = img.crop((crop_left, crop_top, crop_right, crop_bottom))
    resize_height = int(cropped_img.height * (rec["resize_width"] / cropped_img.width))
    resized_img = cropped_img.resize(
        (rec["resize_width"], resize_height), PIL.Image.LANCZOS
    )
    rotated_img = resized_img.rotate(rec["angle"], PIL.Image.BICUBIC)
    rotated_img

    matches = libem.locate_numbers(
        phase1_results_df.query("frame == @base").assign(
            bbox=lambda f: f.bbox.apply(libem.make_polygons_2d)
        )
    )

    for landmark in libem.config:
        landmark

        cfg = config[landmark]

        im = PIL.Image.open(base)
        # base = os.path.basename(frame)
        bbox = matches.query("landmark == @landmark").landmark_bbox.squeeze()

        im, out = libem.ssocr_subimage(im, bbox, cfg["sel"], cfg["ssocr_conf"])
        ocr.ocr(np.asarray(im), cls=True)[-1]

        im
        #im.save(f"test-{landmark}.png")
        out
        pics.append(np.asarray(im))


In [None]:
np.histogram(pics[0][:, :, 0].flatten())

In [None]:
import numpy as np


def rescale_intensity(img, percentile_low=1, percentile_high=99):
    # Compute percentiles for each channel
    perc_low = np.percentile(img, percentile_low, axis=(0, 1))
    perc_high = np.percentile(img, percentile_high, axis=(0, 1))

    # Rescale intensities for each channel
    img_rescaled = np.clip((img - perc_low) / (perc_high - perc_low), 0, 1)

    return img_rescaled


def weighted_grayscale_conversion(img, weights=(1, 1, 1)):
    img_rescaled = rescale_intensity(img)

    # Apply weights and sum across channels
    grayscale_img = np.dot(img_rescaled, weights) / sum(weights)

    grayscale_img = grayscale_img > 0.45

    # Rescale to 0-255 and round
    grayscale_img = np.round(grayscale_img * 255).astype(np.uint8)

    return grayscale_img


# plt.imshow(pics[0])

th = weighted_grayscale_conversion(pics[3], (1, 1, 0))
# th = weighted_grayscale_conversion(pics[3], [0.2126, 0.7152, 0.0722])

# ocr.ocr(th)
# libem.run_ssocr(th, {"number-digits": -1}, [])

th = libem.tidy_crop_frame(th, [50, 0, th.shape[0], th.shape[1]])

th
# plt.imshow(th)
# + pics[0][:, :, 0])

In [None]:
from skimage.filters import try_all_threshold


fig, ax = try_all_threshold(th)



In [None]:
# Convert the image to grayscale
gray_img = cv2.cvtColor(pics[0], cv2.COLOR_BGR2GRAY)

# Apply Otsu's thresholding
thim, thresh = cv2.threshold(gray_img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

plt.imshow(th)

thim


In [None]:
plt.imshow(pics[0][:, :, 2])

In [None]:
pixels = np.concatenate([e[:, :, 0].reshape(-1, 1) for e in pics])

import sklearn.cluster

print("Building k-means model")
# Train K-means model
kmeans = sklearn.cluster.KMeans(n_clusters=2)
kmeans.fit(pixels)

In [None]:
for p in pics[:12]:
    labels = kmeans.predict(p[:, :, 0].reshape(-1, 1))
    if (labels == 0).mean() > 0.5:
        label = 1 - labels
    p_seg = (labels.reshape(p.shape[:2]) * 255).astype(np.uint8)
    PIL.Image.fromarray(p_seg)



In [None]:
libem.crop_to_numbers(_df)


In [None]:
ocr = libem.get_ocr()
ocr.ocr(np.asarray(im), cls=True)

In [None]:
base = "/tmp/tmph7n2du9x/output_040.png"


matches = libem.locate_numbers(
    phase1_results_df.query("frame == @base").assign(
        bbox=lambda f: f.bbox.apply(libem.make_polygons_2d)
    )
)
matches
for landmark in libem.config:
    landmark

    cfg = libem.config[landmark]

    im = PIL.Image.open(base)
    # base = os.path.basename(frame)
    bbox = matches.query("landmark == @landmark").landmark_bbox.squeeze()

    im, out = libem.ssocr_subimage(im, bbox, cfg["sel"], cfg["ssocr_conf"])
    ocr.ocr(np.asarray(im), cls=True)[-1]

    im
    im.save(f"test-{landmark}.png")
    out

In [None]:
shutil.rmtree(wd)

In [None]:
def process_distance(series):
    return (
        series
        # Replace non-matching strings with ''
        .where(series.str.match("\d\.\d{2}$", na=False), "")
        # Convert the strings to float, invalid parsing will be set as NaN
        .pipe(pd.to_numeric, errors="coerce")
        # Set type
        .astype(float)
    )


def process_calories(series):
    return (
        series
        # Create a copy of the series to preserve the original one
        # .copy()
        # Replace non-digit characters with ''
        .str.replace("[^\d]", "", regex=True)
        # Convert the strings to integers, invalid parsing will be set as NaN
        .pipe(pd.to_numeric, errors="coerce", downcast="integer").astype(float)
        # Convert NaN values to pd.NA to have a nullable integer series
        # .replace(np.nan, pd.NA)
    )


# plt.plot(pp.DISTANCE.pipe(process_distance), ss.DISTANCE.pipe(process_distance))
plt.plot(pp.DISTANCE.pipe(process_distance))
plt.plot(ss.DISTANCE.pipe(process_distance))
# plt.plot(pp, ss)

In [None]:

plt.plot(ss.CALORIES.pipe(process_calories))
plt.plot(pp.CALORIES.pipe(process_calories))



In [None]:
Flow("BuildMonitorDatasetFlow").latest_successful_run.data.ocr_df.query(
    "marker == 'TIME'"
)

In [None]:
df = Flow(
    "BuildMonitorDatasetFlow"
).latest_successful_run.data.phase_1_detection_results

In [None]:
f45 = df.query("frame.str.contains('045')")
f45.head()

In [None]:
wd = tempfile.mkdtemp()
args = [
            '/bucket/exercise-machina/IMG_1392.MOV',
            wd,
            "3:30",
"10","output_%03d.png"

]


libem.unpack_video_to_frames(args)


In [None]:
ocr = PaddleOCR(
    use_angle_cls=True, lang="en"
)  # need to run only once to download and load model into memory
from fuzzywuzzy import fuzz, process

from PIL import Image
from paddleocr import PaddleOCR

im = Image.open("/bucket/exercise-machina/tmpqdhq4d5e/frames/output_1404.png")
im

In [None]:
def paddle_results_to_df(result):
    df = pd.DataFrame(result, columns=["bbox", "inference"])
    df[["inference_text", "inference_score"]] = pd.DataFrame(
        df["inference"].tolist(), index=df.index
    )

    df.drop(columns="inference")
    return df


def crop_image(image, bbox):
    """
    Crop an image (in numpy representation) to the given bounding box.

    Args:
    - image (numpy.ndarray): The input image in numpy format (height, width, channels)
    - bbox (list or tuple): The bounding box as (x_min, y_min, x_max, y_max)

    Returns:
    - cropped_image (numpy.ndarray): The cropped image
    """
    x_min, y_min, x_max, y_max = bbox
    cropped_image = image[y_min:y_max, x_min:x_max]

    return cropped_image


def make_polygons_2d(polygons):
    return np.concatenate(polygons).reshape(-1, 2).astype(np.float32)


def super_bounding_box(xy):
    xx, yy = xy[:, 0], xy[:, 1]

    return [xx.min(), xx.max(), yy.min(), yy.max()]


def best_match_2(text, markers):
    # calculate the best match out of the possible markers
    best = process.extractOne(text, markers, score_cutoff=90)
    if best is None:
        return ""
    return best[0]


def compute_distance(bbox1, bbox2):
    center1 = bbox1.mean(axis=0)
    try:
        # Compute centers of mass
        center1 = bbox1.mean(axis=0)
        center2 = bbox2.mean(axis=0)

        # Compute Euclidean distance between centers
        distance = np.linalg.norm(center1 - center2)
    except:
        return 1000

    return distance


def compute_angle(bbox1, bbox2):
    try:
        # Compute centers of mass
        center1 = bbox1.mean(axis=0)
        center2 = bbox2.mean(axis=0)

        # Compute angle relative to bbox1
        diff = center2 - center1
        angle = math.atan2(diff[1], diff[0]) * 180 / math.pi
    except:
        return 180

    return angle


def process_group_2(group):
    landmarks = group.assign(
        inference_clean=group["inference_text"].apply(best_match_2)
    )

    uniques = ["SPEED", "WATTS", "CADENCE", "CALORIES"]
    surround_box = super_bounding_box(
        make_polygons_2d(
            landmarks.query("inference_clean.isin(@uniques)").bbox.to_numpy()
        )
    )
    xl, xr, yt, yb = surround_box
    w = abs(xl - xr)
    h = abs(yt - yb)
    print(w, h)
    # surround_box = [xl, yt, round(xr + 1.05 * w), round(yb - h)]
    return (round(xl - 0.02 * w), yt, round(xr + 0.075 * w), round(yb + 0.42 * h))

In [None]:
def plausible_number(w, thresh=0.5):
    return (sum(1 for c in w if c.isdigit() or c in ":.") / len(w)) > thresh


def locate_numbers(
    df,
    landmarks=(
        "SPEED",
        "WATTS",
        "CADENCE",
        "CALORIES",
        "DISTANCE",
        "TIME",
    ),
    min_score=0.9,
):
    candidates_df = (
        df.query("inference_score >= @min_score")
        .loc[lambda f: f.inference_text.apply(plausible_number)]
        .reset_index(drop=True)
        .assign(box_id=lambda f: range(len(f)))
    )

    out = []
    for landmark in landmarks:
        rec = df.query("inference_text == @landmark")
        if landmark == "TIME":
            # TIME matches two places typically, we want the one more to the left.
            rec = (
                rec.assign(
                    bbox_left=lambda f: [e[:, 0].min() for e in f.bbox]
                ).sort_values("bbox_left")
            ).head(1)
        assert len(rec) == 1
        rec = rec.squeeze()

        _df = (
            candidates_df.assign(
                distance_from_ref=lambda f: [
                    compute_distance(rec.bbox, e) for e in f.bbox
                ],
                angle_from_ref=lambda f: [compute_angle(rec.bbox, e) for e in f.bbox],
            )
            .assign(
                belowness_score=lambda f: f.distance_from_ref
                + abs(f.angle_from_ref - 90)
            )
            .sort_values("belowness_score")
        )

        out.append(_df.head(1).assign(landmark=landmark, landmark_bbox=[rec.bbox]))

    return (
        pd.concat(out)
        .sort_values(["box_id", "belowness_score"])
        .assign(
            inference_text=lambda f: f.inference_text.where(
                ~f.duplicated(subset="box_id"), None
            ),
            inference_score=lambda f: f.inference_score.where(
                f.inference_text.notnull(), None
            ),
            bbox=lambda f: f.bbox.where(f.inference_text.notnull(), None),
        )
        .loc[
            :,
            [
                "landmark",
                "inference_text",
                "inference_score",
                # "distance_from_ref",
                # "angle_from_ref",
                # "belowness_score",
                # "box_id",
                "landmark_bbox",
                "bbox",
            ],
        ]
        .rename(columns={"bbox": "match_bbox"})
        .sort_values("landmark")
        .reset_index(drop=True)
    )


# ff = paddle_results_to_df(ocr.ocr(np.asarray(im), cls=True)[0])
# ff


bb = locate_numbers(ff.assign(bbox=lambda f: f.bbox.apply(make_polygons_2d)))
bb


# imc = im.crop(bb)

# resize_width = 1000
# resize_height = int(
#     imc.height * (resize_width / imc.width)
# )
# imd = imc.resize(
#     (resize_width, resize_height), Image.LANCZOS
# )
# imd

# paddle_results_to_df(ocr.ocr(np.asarray(imd), cls=True)[0])

In [None]:
config = {
    "SPEED": dict(
        sel=(-0.2, 1.25, 1.03, 2.7),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "WATTS": dict(
        sel=(-0.2, 1.1, 0.6, 2.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "CADENCE": dict(
        sel=(0, 1.25, 0.20, 2.9),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "DISTANCE": dict(
        sel=(0, 1.3, 0.75, 3.25),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "TIME": dict(
        sel=(-1.2, 1.2, 1.45, 2.75),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "CALORIES": dict(
        sel=(0, 1.2, 0.2, 2.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
}


sample = (
    pd.Series(glob.glob("/bucket/exercise-machina/tmpqdhq4d5e/frames/*.png"))
    .sample(5, random_state=2009, replace=False)
    .sort_values()
    .tolist()
)
sample = sorted(glob.glob("/bucket/exercise-machina/tmpqdhq4d5e/frames/output_1*.png"))

match_frames = []
for e in sample:
    im = Image.open(e)
    ocr_df = paddle_results_to_df(ocr.ocr(np.asarray(im), cls=True)[0]).assign(
        bbox=lambda f: f.bbox.apply(make_polygons_2d)
    )
    order = int("".join([c for c in os.path.basename(e) if c.isdigit()]))
    match_frame = locate_numbers(ocr_df).assign(frame=e, frame_num=order)

    ssocr_inf = []
    for rec in match_frame.itertuples():
        cfg = config[rec.landmark]
        imc, out = ssocr_subimage(im.convert("L"), rec.landmark_bbox, cfg["sel"], cfg["ssocr_conf"])
        ssocr_inf.append(out)
    match_frame = match_frame.assign(ssocr=ssocr_inf)

    match_frames.append(match_frame)

matches = pd.concat(match_frames).sort_values(["frame_num", "landmark"])
matches.to_csv("ocr-output.csv")

In [None]:
matches.assign(assigned=lambda f: f.inference_text.notnull()).groupby(
    "landmark"
).assigned.mean().sort_values().plot()

In [None]:
matches.head()

(matches.inference_text == matches.ssocr).mean()

In [None]:
def score_crop(a, cf):
    l, t, r, b = cf
    perimeter = np.concatenate(
        [
            a[t:b, l].flatten(),
            a[t:b, r].flatten(),
            a[t, l:r].flatten(),
            a[t, l:r].flatten(),
        ]
    )
    score = perimeter.mean() + perimeter.std()

    return score


def tidy_crop_frame(a, cf, w=15):
    best_score = score_crop(a, cf)
    best_crop = cf

    # print(f"{cf=}")
    for ix in range(len(cf)):
        dr = 1 if (ix >= 2) else -1
        for d in range(-w * dr, w * dr):
            cfp = cf.copy()
            cfp[ix] += d
            score = score_crop(a, cfp)
            if score < best_score:
                # print(f"{best_score=} {ix=} {d=} {cfp=}")
                best_score = score
                best_crop = cfp

    return best_crop


def ssocr_subimage(im, bbox, selection, ssocr_config):
    xy = make_polygons_2d(bbox)

    l, t, r, b = (xy[:, 0].min(), xy[:, 1].min(), xy[:, 0].max(), xy[:, 1].max())
    w = abs(l - r)
    h = abs(t - b)

    s = selection
    init_crop_frame = [
        int(round(e))
        for e in (
            l + s[0] * w,
            t + s[1] * h,
            r + s[2] * w,
            b + s[3] * h,
        )
    ]

    crop_frame = tidy_crop_frame(np.asarray(im), init_crop_frame)
    imc = im.crop(crop_frame)

    return imc, run_ssocr(np.asarray(imc), *ssocr_config)


config = {
    "SPEED": dict(
        sel=(-0.2, 1.25, 1.03, 2.7),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "WATTS": dict(
        sel=(-0.2, 1.1, 0.6, 2.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "CADENCE": dict(
        sel=(0, 1.25, 0.20, 2.9),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "DISTANCE": dict(
        sel=(0, 1.3, 0.75, 3.25),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "TIME": dict(
        sel=(-1.2, 1.2, 1.45, 2.75),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
    "CALORIES": dict(
        sel=(0, 1.2, 0.2, 2.5),
        ssocr_conf=(
            {"number-digits": -1, "threshold": 40},
            [],
        ),
    ),
}


landmark = "CALORIES"
for frame in sample:
    cfg = config[landmark]

    im = images[sample.index(frame)]
    base = os.path.basename(frame)
    bbox = (
        matches.query("frame == @base")
        .query("landmark == @landmark")
        .landmark_bbox.squeeze()
    )

    im, out = ssocr_subimage(im, bbox, cfg["sel"], cfg["ssocr_conf"])

    im
    out

In [None]:
bbox.squeeze()

In [None]:
import tempfile
import cv2
import subprocess
import os


def run_ssocr(image, params, commands):
    # Define the command and parameters
    cmd = ["./ssocr-2.22.2/ssocr"]

    # Add the parameters to the command
    for key, value in params.items():
        if value is None:
            cmd.append(f"--{key}")
        else:
            cmd.append(f"--{key}={value}")

    for _cmd in commands:
        cmd.extend(_cmd.split())

    # If the input is a numpy array, write it to a temporary file
    if isinstance(image, str):
        cmd.append(image)
    else:
        # Create a temporary file
        temp_file = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
        temp_file_path = temp_file.name
        # Write the image to the temporary file
        cv2.imwrite(temp_file_path, image)
        cmd.append(temp_file_path)

    # print(f"{cmd=}")

    # Run the command and get the output
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Delete the temporary file if it was used
    if not isinstance(image, str):
        os.remove(temp_file_path)

    # Check for errors
    if result.stderr:
        print(f"Error: {result.stderr.decode()}")

    # Return the output
    return result.stdout.decode().strip()

In [None]:
import scipy.signal as ss

i = 1
a.std(i).shape
pd.Series(-a.std(i)).plot()
# plt.imshow(ima[:, :, 1])
# plt.imshow(ima[:, :, 2])

ss.find_peaks(-a.std(i), prominence=10)

In [None]:
print("Building up some pixels for clustering")

import sklearn.cluster

# Build k-means model to threshold images.
pixels = np.asarray(imc)
pixels.shape
_pixels = pixels.reshape(-1, 3)

print("Building k-means model")
# Train K-means model
kmeans = sklearn.cluster.KMeans(n_clusters=4)
kmeans.fit(_pixels)



In [None]:
pixels

In [None]:
imb = kmeans.predict(_pixels).reshape(422, 843) == 3
imb.shape
#plt.imshow(imb)
imbb = np.zeros_like(pixels)
# imbb
imbb[:, :, 0] = imb * 255
imbb[:, :, 1] = imb * 255
imbb[:, :, 2] = imb * 255
imbb = Image.fromarray(imbb)

imbb.height, imbb.width

resize_width = 1000
resize_height = int(
    imbb.height * (resize_width / imbb.width)
)
resized_img = imbb.resize(
    (resize_width, resize_height), Image.LANCZOS
)

resized_img


paddle_results_to_df(ocr.ocr(np.asarray(resized_img), cls=True)[0])


In [None]:
# list(Run("BuildMonitorDatasetFlow/24").steps())

Step("BuildMonitorDatasetFlow/24/text_detect_phase2").phase_2_detection_results

In [None]:
ocr_df.loc[:, ["speed", "watts", "cadence"]]

ocr_df.speed.unique()

In [None]:
trdf = pd.read_csv("./top-row-states.csv")

p9.ggplot(trdf, p9.aes("cadence", "speed")) + p9.geom_line() + p9.geom_point()

p9.ggplot(trdf, p9.aes("cadence", "watts")) + p9.geom_line() + p9.geom_point()

In [None]:
ocr_df.loc[:, ["time", "distance", "calories"]].to_csv("moo.csv")

In [None]:
lnl = sum(
    np.log(
        1e-3
        + rapidfuzz.process.cdist(
            ocr_df[term],
            trdf[term].astype("str"),
            processor=lambda w: "".join(e for e in w if e.isdigit()),
        )
    )
    for term in ("speed", "watts", "cadence")
)
n = 11
pd.concat(
    [
        ocr_df.reset_index(drop=True),
        trdf.loc[lnl.argmax(axis=1)].reset_index(drop=True),
    ],
    axis="columns",
    ignore_index=True,
).to_csv("ocr.csv", index=False)

In [None]:
pd.json_normalize(
    Flow("BuildMonitorDatasetFlow").latest_successful_run.data.phase1_results[
        "predictions"
    ]
).loc[:, ["rec_texts", "det_polygons"]].explode(["rec_texts", "det_polygons"]).query(
    "rec_texts in ('speed', 'cadence', 'distance', 'calories')"
)

In [None]:
from mmocr.apis import MMOCRInferencer

ocr = MMOCRInferencer(det="DBNetpp", rec="ABINet")

ocr("/bucket/exercise-machina/frames",
    batch_size=8,
    out_dir="/bucket/exercise-machina/frames-ocr",
   save_pred=True
   )

#x = ocr("output_001.png", show=False, print_result=False, return_vis=False)


In [None]:
import cv2
import numpy as np
from fuzzywuzzy import fuzz


def display_image(image, width_inches=12.5):
    height, width, _ = image.shape
    aspect_ratio = height / width

    fig_width = width_inches
    fig_height = width_inches * aspect_ratio

    plt.figure(figsize=(fig_width, fig_height))
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    # plt.imshow(image)
    plt.axis("off")
    plt.show()


def draw_ocr_results(image, ocr_data, extra_polygons=[]):
    # Convert the image to BGR format for drawing with OpenCV
    if len(image.shape) == 2 or image.shape[2] == 1:
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)

    for ix, row in ocr_data.iterrows():
        text = row["rec_texts"]
        polygon = row["det_polygons"]
        points = np.array(polygon).reshape(-1, 2).astype(np.int32)

        # Draw the bounding box
        cv2.polylines(image, [points], isClosed=True, color=(0, 255, 255), thickness=2)

        # Calculate the center of the bounding box
        center = points.mean(axis=0).astype(np.int32)

        # Put the text annotation in the center of the bounding box
        cv2.putText(
            image, text, tuple(center), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2
        )

    for pg in extra_polygons:
        points = np.array(pg).reshape(-1, 2).astype(np.int32)

        # Draw the bounding box
        cv2.polylines(image, [points], isClosed=True, color=(255, 0, 0), thickness=3)

    return image


def find_most_similar_text(df, query, similarity_threshold=90):
    best_match = None
    best_similarity = 0

    for index, row in df.iterrows():
        similarity = fuzz.ratio(query, row["rec_texts"])
        if (similarity >= similarity_threshold) and (similarity > best_similarity):
            best_similarity = similarity
            best_match = row

    if best_match is not None:
        return np.asarray(best_match["det_polygons"]).reshape(-1, 2).astype(np.int32)

    return []


def super_bounding_box(landmarks, keys):
    min_x, min_y = float("inf"), float("inf")
    max_x, max_y = float("-inf"), float("-inf")

    for key in keys:
        if key not in landmarks:
            raise ValueError(f"Key '{key}' not found in landmarks.")

        bounding_box = landmarks[key]
        x1, y1 = bounding_box[:, 0].min(), bounding_box[:, 1].min()
        x2, y2 = bounding_box[:, 0].max(), bounding_box[:, 1].max()

        min_x, min_y = min(min_x, x1), min(min_y, y1)
        max_x, max_y = max(max_x, x2), max(max_y, y2)

    return np.array([[min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y]])


def infill_bounding_box(landmarks, key1, key2):
    if key1 not in landmarks:
        raise ValueError(f"Key '{key1}' not found in landmarks.")
    if key2 not in landmarks:
        raise ValueError(f"Key '{key2}' not found in landmarks.")

    bbox1 = landmarks[key1]
    bbox2 = landmarks[key2]

    x1_1, y1_1 = bbox1[:, 0].min(), bbox1[:, 1].min()
    x1_2, y1_2 = bbox1[:, 0].max(), bbox1[:, 1].max()

    x2_1, y2_1 = bbox2[:, 0].min(), bbox2[:, 1].min()
    x2_2, y2_2 = bbox2[:, 0].max(), bbox2[:, 1].max()

    min_x = max(x1_2, x2_1)
    max_x = min(x1_1, x2_2)
    min_y = max(y1_1, y2_1)
    max_y = min(y1_2, y2_2)

    return np.array([[min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y]])


def infill_top_bottom_bbox(landmarks, top_key, bottom_key):
    if top_key not in landmarks:
        raise ValueError(f"Key '{top_key}' not found in landmarks.")
    if bottom_key not in landmarks:
        raise ValueError(f"Key '{bottom_key}' not found in landmarks.")

    top_bbox = landmarks[top_key]
    bottom_bbox = landmarks[bottom_key]

    top_left = [top_bbox[:, 0].min(), top_bbox[:, 1].max()]
    top_right = [top_bbox[:, 0].max(), top_bbox[:, 1].max()]

    bottom_left = [bottom_bbox[:, 0].min(), bottom_bbox[:, 1].min()]
    bottom_right = [bottom_bbox[:, 0].max(), bottom_bbox[:, 1].min()]

    return np.array([top_left, top_right, bottom_right, bottom_left])


# Example usage:
# landmarks = {
#     'top_key': np.array([[10, 20], [30, 20], [30, 40], [10, 40]]),
#     'bottom_key': np.array([[15, 50], [25, 50], [25, 60], [15, 60]]),
# }
# infill_top_bottom = infill_top_bottom_bbox(landmarks, 'top_key', 'bottom_key')
# print(infill_top_bottom)


def build_ss_polygons(df):
    landmarks = [
        "rogue",
        "intervals",
        "speed",
        "mph",
        "watts",
        "cadence",
        "targets",
        "distance",
        "time",
        "calories",
        "heart",
        "rate",
        "intervals",
    ]
    landmark_map = dict((e, find_most_similar_text(df, e)) for e in landmarks)

    landmark_map["smwc"] = super_bounding_box(
        landmark_map, ["speed", "mph", "watts", "cadence"]
    )
    landmark_map["tdtc"] = super_bounding_box(
        landmark_map, ["targets", "distance", "time", "calories"]
    )
    landmark_map["hr"] = super_bounding_box(landmark_map, ["heart", "rate"])

    #     landmark_map["upper_row"] = infill_top_bottom_bbox(
    #         landmark_map, "speed+mph", "tdtc"
    #     )
    #     landmark_map["lower_row"] = infill_top_bottom_bbox(landmark_map, "tdtc", "hr")

    return landmark_map


#     return [
#         landmark_map["upper_row"],
#         landmark_map["lower_row"],
#     ]


ocr_df = pd.DataFrame(x["predictions"][0])

image = cv2.imread("output_001.png")


digit_bboxes = build_ss_polygons(ocr_df)

image_p = draw_ocr_results(image, ocr_df, [])

display_image(image_p)

In [None]:
display_image(image)

In [None]:
# digit_bboxes

from scipy.stats import linregress

keys = (
    ["rogue", "intervals"]
    + ["speed", "mph", "watts", "cadence"]
    + ["targets", "distance", "time", "calories"]
    + ["heart", "rate"]
)

for key in keys:
    slope, *rest = linregress(digit_bboxes[key][:, 0], digit_bboxes[key][:, 1])
    angle_rad = np.arctan(slope)
    angle_deg = np.degrees(angle_rad)

    key, slope, angle_deg

In [None]:
import numpy as np
from sklearn.linear_model import LinearRegression

def estimate_angle_from_horizontal(bbox):
    # Get the x and y coordinates of the bounding box corners
    x = bbox[:, 0].reshape(-1, 1)
    y = bbox[:, 1]

    # Fit a linear regression model
    lr = LinearRegression()
    lr.fit(x, y)

    # Calculate the angle from the slope using arctan
    angle_rad = np.arctan(lr.coef_[0])
    angle_deg = np.degrees(angle_rad)

    # Adjust the angle to be within -90 to 90 degrees
    if angle_deg > 45:
        angle_deg -= 90
    elif angle_deg < -45:
        angle_deg += 90

    return angle_deg

# Example usage:
# bbox = np.array([[10, 20], [30, 20], [30, 40], [10, 40]])
# angle = estimate_angle_from_horizontal(bbox)
# print(angle)


import cv2
import numpy as np

def optimal_rectangular_transform(bboxes):
    transforms = []
    
    for bbox in bboxes:
        # Estimate angles for each pair of opposite sides
        angle1 = estimate_angle_from_horizontal(bbox[[0, 1], :])
        angle2 = estimate_angle_from_horizontal(bbox[[2, 3], :])
        angle3 = estimate_angle_from_horizontal(bbox[[0, 3], :])
        angle4 = estimate_angle_from_horizontal(bbox[[1, 2], :])

        # Average the angles
        avg_horizontal_angle = (angle1 + angle2) / 2
        avg_vertical_angle = (angle3 + angle4) / 2

        # Get the center of the bounding box
        center = np.mean(bbox, axis=0)

        # Calculate the affine transformation matrix
        horizontal_rotation_matrix = cv2.getRotationMatrix2D(tuple(center), avg_horizontal_angle, 1)
        vertical_rotation_matrix = cv2.getRotationMatrix2D(tuple(center), avg_vertical_angle, 1)

        # Apply the horizontal rotation
        rotated_bbox = cv2.transform(np.float32([bbox]), horizontal_rotation_matrix)[0]

        # Compute the vertical rotation center
        rotated_center = np.mean(rotated_bbox, axis=0)

        # Calculate the vertical rotation matrix based on the rotated center
        vertical_rotation_matrix = cv2.getRotationMatrix2D(tuple(rotated_center), avg_vertical_angle, 1)

        # Store the transformation matrices
        transforms.append((horizontal_rotation_matrix, vertical_rotation_matrix))

    return transforms

# Example usage:
# bboxes = [np.array([[10, 20], [30, 20], [30, 40], [10, 40]])]
# transforms = optimal_rectangular_transform(bboxes)
# print(transforms)


# Example usage:
bboxes = [digit_bboxes["rogue"], digit_bboxes["cadence"]]
transforms = optimal_rectangular_transform(bboxes)
print(transforms)


In [None]:
import numpy as np
import cv2

def compute_perspective_transform(bbox, target_bbox):
    src_points = np.float32(bbox)
    dst_points = np.float32(target_bbox)
    transformation_matrix = cv2.getPerspectiveTransform(src_points, dst_points)
    return transformation_matrix

bboxes = [...]  # List of k bounding boxes
target_bboxes = [...]  # List of k corresponding target rectangular bounding boxes
weights = [...]  # List of k corresponding weights

# Calculate the perspective transformations for each bounding box
transformations = [compute_perspective_transform(bbox, target_bbox)
                   for bbox, target_bbox in zip(bboxes, target_bboxes)]

# Normalize the weights
normalized_weights = np.array(weights) / np.sum(weights)

# Compute the weighted average of the transformation matrices
weighted_avg_transform = np.zeros((3, 3), dtype=np.float32)
for transform, weight in zip(transformations, normalized_weights):
    weighted_avg_transform += transform * weight


In [None]:
import cv2
import numpy as np
from IPython.display import display, clear_output
import ipywidgets as widgets
from PIL import Image
from io import BytesIO

def display_image(img):
    _, img_encoded = cv2.imencode('.png', img)
    img_bytes = img_encoded.tobytes()
    img_pil = Image.open(BytesIO(img_bytes))
    display(img_pil)

def warp_perspective(angle, scale, tx, ty):
    clear_output(wait=True)
    
    rows, cols, _ = img.shape
    center = (cols // 2, rows // 2)
    rotation_matrix = cv2.getRotationMatrix2D(center, angle, scale)
    rotation_matrix[:, 2] += [tx, ty]

    img_warped = cv2.warpAffine(img, rotation_matrix, (cols, rows))
    display_image(img_warped)

# Load your input image
#img = cv2.imread('path/to/your/image.jpg')
img = image_p

# Create widgets for the transformation parameters
angle_slider = widgets.FloatSlider(min=-5, max=5, step=0.1, value=0, description='Rotation Angle')
scale_slider = widgets.FloatSlider(min=0.1, max=3, step=0.1, value=1, description='Scale')
tx_slider = widgets.IntSlider(min=-100, max=100, step=1, value=0, description='Translation X')
ty_slider = widgets.IntSlider(min=-100, max=100, step=1, value=0, description='Translation Y')

widgets.interact(warp_perspective, angle=angle_slider, scale=scale_slider, tx=tx_slider, ty=ty_slider)


In [None]:
import cv2
import numpy as np
from IPython.display import display, clear_output
import ipywidgets as widgets
from PIL import Image
from io import BytesIO

def display_image(img):
    _, img_encoded = cv2.imencode('.png', img)
    img_bytes = img_encoded.tobytes()
    img_pil = Image.open(BytesIO(img_bytes))
    display(img_pil)


def warp_perspective(pitch, yaw, roll):
    clear_output(wait=True)
    
    def rotation_matrix(axis, angle):
        return cv2.Rodrigues(np.radians(angle) * axis)[0]

    rows, cols, _ = img.shape
    center = (cols // 2, rows // 2)
    focal_length = cols / (2 * np.tan(np.radians(60) / 2))
    
    # Build the rotation matrix
    r_x = rotation_matrix(np.array([1, 0, 0]), pitch)
    r_y = rotation_matrix(np.array([0, 1, 0]), yaw)
    r_z = rotation_matrix(np.array([0, 0, 1]), roll)
    r = np.matmul(np.matmul(r_z, r_y), r_x)

    # Build the perspective transformation matrix
    m = np.zeros((3, 4), dtype=np.float64)
    m[:, :3] = r
    m[:, 3] = [0, 0, focal_length]
    m_4x4 = np.vstack((m, np.array([0, 0, 0, 1])))
    pre_m = np.array([[1, 0, 0, -center[0]], [0, 1, 0, -center[1]], [0, 0, 1, 0], [0, 0, 0, 1]])
    m = np.matmul(pre_m, m_4x4)[:3]
    
    # Apply the perspective transformation
    img_warped = cv2.warpPerspective(img, m, (cols, rows))
    display_image(img_warped)
# Load your input image
#img = cv2.imread('path/to/your/image.jpg')
img = image_p

# Create widgets for the angles
pitch_slider = widgets.FloatSlider(min=-10, max=10, step=0.5, value=0, description='Pitch')
yaw_slider = widgets.FloatSlider(min=-10, max=10, step=0.5, value=0, description='Yaw')
roll_slider = widgets.FloatSlider(min=-10, max=10, step=0.5, value=0, description='Roll')

widgets.interact(warp_perspective, pitch=pitch_slider, yaw=yaw_slider, roll=roll_slider)


In [None]:
def square_off(bbox, f=0.5):
    x1 = bbox[[0, 1], 0].mean()
    x2 = bbox[[2, 3], 0].mean()

    y1 = bbox[[0, 2], 1].mean()
    y2 = bbox[[1, 3], 1].mean()

    out = np.array(
        [
            [x1, y1],
            [x1, y2],
            [x2, y1],
            [x2, y2],
        ]
    )

    return (1 - f) * bbox + f * out


# Define a 2D numpy array bounding box
src_bbox = digit_bboxes["watts"].astype(np.float32)
src_bbox

dst_bbox = square_off(src_bbox, 1)
# dst_bbox = src_bbox
dst_bbox

# Define the destination bounding box
# dst_bbox = np.array([[100, 100], [200, 100], [200, 200], [100, 200]], dtype=np.float32)

# Calculate the perspective transformation matrix
M = cv2.getPerspectiveTransform(src_bbox, dst_bbox)
M

# Now you can use M to transform the original image using cv2.warpPerspective
rows, cols, _ = image_p.shape
display_image(image_p)
display_image(cv2.warpPerspective(image_p, M, (cols, rows)))

In [None]:
df = pd.DataFrame(
    np.concatenate(
        [
            digit_bboxes[e]
            for e in [
                "speed",
                # "mph",
                # "watts",
                "cadence",
                "distance",
                # "time",
                "calories",
                # "targets",
                # "heart",
                # "rate",
            ]
        ]
    ),
    columns=["x", "y"],
)
# df
# ?linregress
slope, *rest = linregress(df.x.to_numpy(), df.y.to_numpy())

p9.ggplot(df, p9.aes("x", "-y")) + p9.geom_point() + p9.geom_smooth(method="lm")


def warp_perspective(img, angle=0, scale=1, tx=0, ty=0):
    # clear_output(wait=True)

    rows, cols, _ = img.shape
    center = (cols // 2, rows // 2)
    rotation_matrix = cv2.getRotationMatrix2D(center, angle, scale)
    rotation_matrix[:, 2] += [tx, ty]

    return cv2.warpAffine(img, rotation_matrix, (cols, rows))


def crop_image(image, bbox):
    """
    Crop an image (in numpy representation) to the given bounding box.

    Args:
    - image (numpy.ndarray): The input image in numpy format (height, width, channels)
    - bbox (list or tuple): The bounding box as (x_min, y_min, x_max, y_max)

    Returns:
    - cropped_image (numpy.ndarray): The cropped image
    """
    x_min, y_min, x_max, y_max = bbox
    cropped_image = image[y_min:y_max, x_min:x_max]

    return cropped_image


width = df.x.max() - df.x.min()
height = df.y.max() - df.y.min()
(width, height)
bbox = np.round(
    np.asarray(
        [
            df.x.min() - 0.05 * width,
            df.y.min() - 0.05 * height,
            df.x.max() + 0.10 * width,
            df.y.max() + 0.45 * height,
        ]
    )
).astype(int)

image_p_c = crop_image(image_p, bbox)


display_image(image_p_c)

angle_rad = np.arctan(slope)
angle_deg = np.degrees(angle_rad)
(slope, angle_rad, angle_deg)

display_image(warp_perspective(image_p_c, angle=angle_deg))

In [None]:
x.keys()

pd.DataFrame(x["predictions"][0])

plt.imshow(x["visualization"][0])

In [None]:
1 / 0

In [None]:
# Iterate through all the image files in the directory
for fname in image_directory.glob("*"):
    if not fname.suffix in {'.png', '.jpg', '.jpeg'}:
        continue
    
    print(f"On image file [{fname}]")

    # Run EasyOCR on the image
    results = reader.readtext(str(fname))

    print(results)



In [None]:
from mmocr.apis import MMOCRInferencer
infer = MMOCRInferencer(rec='svtr-small')
result = infer(root / "frames/output_001.png", save_vis=True, return_vis=True)
print(result['predictions'])

In [None]:
plt.imshow(result['visualization'][0])
plt.show()