-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add utils package with modules providing utility functions.
- Loading branch information
Showing
5 changed files
with
327 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
"""Package to provide utility functions for the insect-detect repository. | ||
Source: https://github.com/maxsitt/insect-detect | ||
License: GNU GPLv3 (https://choosealicense.com/licenses/gpl-3.0/) | ||
Author: Maximilian Sittinger (https://github.com/maxsitt) | ||
Docs: https://maxsitt.github.io/insect-detect-docs/ | ||
Modules: | ||
general: Utility functions for bounding box adjustment and data storage. | ||
log: Utility functions to print information or save to log file. | ||
oak_cam: Utility functions to control the OAK camera. | ||
save_data: Utility functions to save images and metadata. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
"""Utility functions for bounding box adjustment and data storage. | ||
Source: https://github.com/maxsitt/insect-detect | ||
License: GNU GPLv3 (https://choosealicense.com/licenses/gpl-3.0/) | ||
Author: Maximilian Sittinger (https://github.com/maxsitt) | ||
Docs: https://maxsitt.github.io/insect-detect-docs/ | ||
Functions: | ||
frame_norm(): Convert relative bounding box coordinates (0-1) to pixel coordinates. | ||
make_bbox_square(): Adjust bounding box dimensions to make it square. | ||
zip_data(): Store data in an uncompressed .zip file for each day and delete original directory. | ||
frame_norm() is based on open source scripts available at https://github.com/luxonis | ||
""" | ||
|
||
import shutil | ||
from zipfile import ZipFile | ||
|
||
import numpy as np | ||
|
||
|
||
def frame_norm(frame, bbox): | ||
"""Convert relative bounding box coordinates (0-1) to pixel coordinates.""" | ||
norm_vals = np.full(len(bbox), frame.shape[0]) | ||
norm_vals[::2] = frame.shape[1] | ||
|
||
return (np.clip(np.array(bbox), 0, 1) * norm_vals).astype(int) | ||
|
||
|
||
def make_bbox_square(frame, bbox): | ||
"""Adjust bounding box dimensions to make it square. | ||
Increase bounding box size on both sides of the minimum dimension, | ||
or only on one side if bbox is localized at the frame margin. | ||
""" | ||
bbox_width = bbox[2] - bbox[0] | ||
bbox_height = bbox[3] - bbox[1] | ||
bbox_diff = abs(bbox_width - bbox_height) // 2 | ||
|
||
if bbox_width < bbox_height: | ||
if bbox[0] - bbox_diff < 0: | ||
bbox[0] = 0 | ||
bbox[2] = bbox[2] + bbox_diff * 2 - bbox[0] | ||
elif bbox[2] + bbox_diff > frame.shape[1]: | ||
bbox[0] = bbox[0] - bbox_diff * 2 + frame.shape[1] - bbox[2] | ||
bbox[2] = frame.shape[1] | ||
else: | ||
bbox[0] = bbox[0] - bbox_diff | ||
bbox[2] = bbox[2] + bbox_diff | ||
else: | ||
if bbox[1] - bbox_diff < 0: | ||
bbox[1] = 0 | ||
bbox[3] = bbox[3] + bbox_diff * 2 - bbox[1] | ||
elif bbox[3] + bbox_diff > frame.shape[0]: | ||
bbox[1] = bbox[1] - bbox_diff * 2 + frame.shape[0] - bbox[3] | ||
bbox[3] = frame.shape[0] | ||
else: | ||
bbox[1] = bbox[1] - bbox_diff | ||
bbox[3] = bbox[3] + bbox_diff | ||
|
||
return bbox | ||
|
||
|
||
def zip_data(save_path): | ||
"""Store data in an uncompressed .zip file for each day and delete original directory.""" | ||
with ZipFile(f"{save_path.parent}.zip", "a") as zip_file: | ||
for file in save_path.rglob("*"): | ||
zip_file.write(file, file.relative_to(save_path.parent)) | ||
|
||
shutil.rmtree(save_path.parent, ignore_errors=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
"""Utility functions to print information or save to log file. | ||
Source: https://github.com/maxsitt/insect-detect | ||
License: GNU GPLv3 (https://choosealicense.com/licenses/gpl-3.0/) | ||
Author: Maximilian Sittinger (https://github.com/maxsitt) | ||
Docs: https://maxsitt.github.io/insect-detect-docs/ | ||
Functions: | ||
print_logs(): Print Raspberry Pi information. | ||
save_logs(): Write information to .csv file during recording. | ||
record_log(): Write information to .csv file at the end of the recording interval. | ||
""" | ||
|
||
import csv | ||
import logging | ||
from datetime import datetime | ||
|
||
import pandas as pd | ||
import psutil | ||
from gpiozero import CPUTemperature | ||
|
||
|
||
def print_logs(): | ||
"""Print Raspberry Pi information.""" | ||
logging.basicConfig(level=logging.INFO, format="%(message)s") | ||
logging.info("\nAvailable RPi memory: %s MB", round(psutil.virtual_memory().available / 1048576)) | ||
logging.info("RPi CPU utilization: %s %%", round(psutil.cpu_percent(interval=None))) | ||
logging.info("RPi CPU temperature: %s °C\n", round(CPUTemperature().temperature)) | ||
|
||
|
||
def save_logs(device, rec_id, rec_start, save_path, | ||
pijuice=None, chargelevel=None): | ||
"""Write information to .csv file during recording. | ||
Write recording ID, current time, RPi CPU + OAK chip temperature | ||
and RPi available memory (MB) + CPU utilization (%) to .csv. | ||
If pijuice is provided, also write PiJuice battery info + temp to .csv. | ||
""" | ||
try: | ||
temp_oak = round(device.getChipTemperature().average) | ||
except RuntimeError: | ||
temp_oak = "NA" | ||
|
||
try: | ||
logs = { | ||
"rec_ID": rec_id, | ||
"timestamp": datetime.now().isoformat(), | ||
"temp_pi": round(CPUTemperature().temperature), | ||
"temp_oak": temp_oak, | ||
"pi_mem_available": round(psutil.virtual_memory().available / 1048576), | ||
"pi_cpu_used": psutil.cpu_percent(interval=None) | ||
} | ||
if pijuice and chargelevel: | ||
logs.update({ | ||
"power_input": pijuice.status.GetStatus().get("data", {}).get("powerInput", "NA"), | ||
"charge_status": pijuice.status.GetStatus().get("data", {}).get("battery", "NA"), | ||
"charge_level": chargelevel, | ||
"voltage_batt_mV": pijuice.status.GetBatteryVoltage().get("data", "NA"), | ||
"temp_batt": pijuice.status.GetBatteryTemperature().get("data", "NA") | ||
}) | ||
except IndexError: | ||
logs = {} | ||
|
||
with open(save_path.parent / f"{rec_start.date()}_info_log.csv", "a", encoding="utf-8") as log_file: | ||
log_writer = csv.DictWriter(log_file, fieldnames=logs.keys()) | ||
if log_file.tell() == 0: | ||
log_writer.writeheader() | ||
log_writer.writerow(logs) | ||
|
||
|
||
def record_log(rec_id, rec_start, rec_start_format, rec_end, save_path, | ||
chargelevel_start=None, chargelevel=None): | ||
"""Write information to .csv file at the end of the recording interval. | ||
Write recording ID, recording start and end time, recording duration (min), | ||
number of cropped detections, number of unique tracking IDs and | ||
available disk space (GB) to .csv. | ||
If chargelevel_start and chargelevel are provided, also write both to .csv. | ||
""" | ||
try: | ||
df_meta = pd.read_csv(save_path / f"{rec_start_format}_metadata.csv", encoding="utf-8") | ||
unique_ids = df_meta["track_ID"].nunique() | ||
except (pd.errors.EmptyDataError, FileNotFoundError): | ||
unique_ids = 0 | ||
|
||
logs_rec = { | ||
"rec_ID": rec_id, | ||
"rec_start": rec_start.isoformat(), | ||
"rec_end": rec_end.isoformat(), | ||
"rec_time_min": round((rec_end - rec_start).total_seconds() / 60, 2), | ||
"num_crops": len(list((save_path / "crop").glob("**/*.jpg"))), | ||
"num_IDs": unique_ids, | ||
"disk_free_gb": round(psutil.disk_usage("/").free / 1073741824, 1) | ||
} | ||
if chargelevel_start and chargelevel: | ||
logs_rec.update({ | ||
"chargelevel_start": chargelevel_start, | ||
"chargelevel_end": chargelevel | ||
}) | ||
|
||
with open(save_path.parents[1] / "record_log.csv", "a", encoding="utf-8") as log_rec_file: | ||
log_rec_writer = csv.DictWriter(log_rec_file, fieldnames=logs_rec.keys()) | ||
if log_rec_file.tell() == 0: | ||
log_rec_writer.writeheader() | ||
log_rec_writer.writerow(logs_rec) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
"""Utility functions to control the OAK camera. | ||
Source: https://github.com/maxsitt/insect-detect | ||
License: GNU GPLv3 (https://choosealicense.com/licenses/gpl-3.0/) | ||
Author: Maximilian Sittinger (https://github.com/maxsitt) | ||
Docs: https://maxsitt.github.io/insect-detect-docs/ | ||
Functions: | ||
set_focus_range(): Convert closest cm values to lens position values and set auto focus range. | ||
bbox_set_exposure_region(): Use bounding box coordinates to set auto exposure region. | ||
partly based on open source scripts available at https://github.com/luxonis | ||
""" | ||
|
||
import depthai as dai | ||
|
||
|
||
def set_focus_range(cm_min, cm_max): | ||
"""Convert closest cm values to lens position values and set auto focus range.""" | ||
cm_lenspos = { | ||
6: 250, | ||
8: 220, | ||
10: 190, | ||
12: 170, | ||
14: 160, | ||
16: 150, | ||
20: 140, | ||
25: 135, | ||
30: 130, | ||
40: 125, | ||
60: 120 | ||
} | ||
|
||
closest_cm_min = min(cm_lenspos.keys(), key=lambda k: abs(k - cm_min)) | ||
closest_cm_max = min(cm_lenspos.keys(), key=lambda k: abs(k - cm_max)) | ||
lenspos_min = cm_lenspos[closest_cm_max] | ||
lenspos_max = cm_lenspos[closest_cm_min] | ||
|
||
af_ctrl = dai.CameraControl().setAutoFocusLensRange(lenspos_min, lenspos_max) | ||
|
||
return af_ctrl | ||
|
||
|
||
def bbox_set_exposure_region(bbox, sensor_res): | ||
"""Use bounding box coordinates to set auto exposure region.""" | ||
xmin_roi = max(0.001, bbox[0]) | ||
ymin_roi = max(0.001, bbox[1]) | ||
xmax_roi = min(0.999, bbox[2]) | ||
ymax_roi = min(0.999, bbox[3]) | ||
|
||
roi_x = int(xmin_roi * sensor_res[0]) | ||
roi_y = int(ymin_roi * sensor_res[1]) | ||
roi_width = int((xmax_roi - xmin_roi) * sensor_res[0]) | ||
roi_height = int((ymax_roi - ymin_roi) * sensor_res[1]) | ||
|
||
ae_ctrl = dai.CameraControl().setAutoExposureRegion(roi_x, roi_y, roi_width, roi_height) | ||
|
||
return ae_ctrl |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
"""Utility functions to save images and metadata. | ||
Source: https://github.com/maxsitt/insect-detect | ||
License: GNU GPLv3 (https://choosealicense.com/licenses/gpl-3.0/) | ||
Author: Maximilian Sittinger (https://github.com/maxsitt) | ||
Docs: https://maxsitt.github.io/insect-detect-docs/ | ||
Functions: | ||
save_crop_metadata(): Save cropped detection to .jpg and corresponding metadata to .csv. | ||
save_raw_frame(): Save full frame to .jpg. | ||
save_overlay_frame(): Save full frame with overlays to .jpg. | ||
partly based on open source scripts available at https://github.com/luxonis | ||
""" | ||
|
||
import csv | ||
from datetime import datetime | ||
|
||
import cv2 | ||
|
||
from utils.general import make_bbox_square | ||
|
||
|
||
def save_crop_metadata(frame, bbox, rec_id, label, det_conf, track_id, bbox_orig, | ||
rec_start_format, save_path, crop="square"): | ||
"""Save cropped detection to .jpg and corresponding metadata to .csv.""" | ||
timestamp = datetime.now() | ||
if crop == "square": | ||
bbox = make_bbox_square(frame, bbox.copy()) | ||
bbox_crop = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]] | ||
timestamp_crop = timestamp.strftime("%Y-%m-%d_%H-%M-%S-%f") | ||
path_crop = f"{save_path}/crop/{label}/{timestamp_crop}_ID{track_id}_crop.jpg" | ||
cv2.imwrite(path_crop, bbox_crop) | ||
|
||
metadata = { | ||
"rec_ID": rec_id, | ||
"timestamp": timestamp.isoformat(), | ||
"label": label, | ||
"confidence": det_conf, | ||
"track_ID": track_id, | ||
"x_min": round(bbox_orig[0], 4), | ||
"y_min": round(bbox_orig[1], 4), | ||
"x_max": round(bbox_orig[2], 4), | ||
"y_max": round(bbox_orig[3], 4), | ||
"file_path": path_crop | ||
} | ||
|
||
with open(save_path / f"{rec_start_format}_metadata.csv", "a", encoding="utf-8") as metadata_file: | ||
metadata_writer = csv.DictWriter(metadata_file, fieldnames=metadata.keys()) | ||
if metadata_file.tell() == 0: | ||
metadata_writer.writeheader() | ||
metadata_writer.writerow(metadata) | ||
|
||
|
||
def save_raw_frame(frame, tracklet, tracks, save_path): | ||
"""Save full frame to .jpg.""" | ||
if tracklet == tracks[0]: | ||
timestamp_raw = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f") | ||
path_raw = f"{save_path}/raw/{timestamp_raw}_raw.jpg" | ||
cv2.imwrite(path_raw, frame) | ||
|
||
|
||
def save_overlay_frame(frame, bbox, label, det_conf, track_id, | ||
tracklet, tracks, save_path, res_4k=False): | ||
"""Save full frame with overlays to .jpg.""" | ||
text_pos = (48, 98, 164) if res_4k else (28, 55, 92) | ||
font_size = (1.7, 1.6, 2) if res_4k else (0.9, 0.8, 1.1) | ||
thickness = 3 if res_4k else 2 | ||
|
||
cv2.putText(frame, label, (bbox[0], bbox[3] + text_pos[0]), | ||
cv2.FONT_HERSHEY_SIMPLEX, font_size[0], (255, 255, 255), thickness) | ||
cv2.putText(frame, f"{det_conf}", (bbox[0], bbox[3] + text_pos[1]), | ||
cv2.FONT_HERSHEY_SIMPLEX, font_size[1], (255, 255, 255), thickness) | ||
cv2.putText(frame, f"ID:{track_id}", (bbox[0], bbox[3] + text_pos[2]), | ||
cv2.FONT_HERSHEY_SIMPLEX, font_size[2], (255, 255, 255), thickness) | ||
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), thickness) | ||
|
||
if tracklet == tracks[-1]: | ||
timestamp_overlay = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f") | ||
path_overlay = f"{save_path}/overlay/{timestamp_overlay}_overlay.jpg" | ||
cv2.imwrite(path_overlay, frame) |