In [3]:
import csv
import os
import math
from networktables import NetworkTables
import logging
import cv2
import matplotlib.pyplot as plt
import numpy as np
import random

logger = logging.getLogger('nt')
logger.setLevel(logging.DEBUG)
logging.basicConfig()

In [None]:
NetworkTables.initialize(server="localhost")

In [None]:
class DataCollector:
    def __init__(self, table, folder='.', csv_filename='data.csv', target_radius = 0.150, plate_thickness=0.01,
                 start_pan_angle=0.0, end_pan_angle=355.0, step_pan_angle=5.0, 
                 camera_h_fov=69, camera_h_res=2592, camera_v_res=1944, camera_id=0):
        self.table = table
        self.folder = folder
        self.csv_path = os.path.join(folder, csv_filename)
        self.target_radius = target_radius
        self.plate_thickness = plate_thickness
        self.start_pan_angle = start_pan_angle
        self.end_pan_angle = end_pan_angle
        self.step_pan_angle = step_pan_angle
        self.camera_h_fov = camera_h_fov
        self.camera_h_res = camera_h_res
        self.camera_v_res = camera_v_res
        # assume square pixels
        self.res_per_deg = self.camera_h_res / self.camera_h_fov
        # assume center of camera image until adjusted
        self.center_x = int(self.camera_h_res / 2)
        self.center_y = int(self.camera_v_res / 2)
        self.camera_id = camera_id
        
    def startRun(self, base_file_name):
        self.base_file_name = base_file_name
        # get current state, distance, and angles
        self.state = self.table.getString("State", None)
        self.tilt_angle = self.table.getNumber("TiltAngle", None)
        self.pan_angle = self.table.getNumber("PanAngle", None)
        self.ground_distance = self.table.getNumber("GroundDistance", None)
        self.slope_distance = self.table.getNumber("SlopeDistance", None)
        # we haven't started a run yet
        self.run_started = False
        self.run_done = False
        self.pan_started = False
        self.pan_done = False
        self.next_pan_angle = None
        self.sequence = 1
        self.data = []
        self._openCamera()
        self.table.addEntryListener(self._changeHandler)

    def _openCamera(self):
        # open the camera
        self.cap = cv2.VideoCapture(self.camera_id)
        # disables auto exposure
        self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1)
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_h_res)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_v_res)
        
    def _updateValue(self, key, value):
        if key == 'State':
            self.state = value
        elif key == 'TiltAngle':
            self.tilt_angle = value
        elif key == 'PanAngle':
            self.pan_angle = value
        elif key == 'GroundDistance':
            self.ground_distance = value
        elif key == 'SlopeDistance':
            self.slope_distance = value
            
    def _changeHandler(self, table, key, value, isNew):
        self._updateValue(key, value)
        # finite state machine
        if self.state == 'STANDBY':
            if self.run_started:
                # run is complete
                self.run_done = True
                self.cap.release()
                self.table.removeEntryListener(self._changeHandler)
                self._writeData()
                print('Run Complete')
        elif self.state == 'PREPARE':
            self.pan_started = False
            self.pan_done = False
        elif self.state == 'ROTATE_COLLECT':
            if not self.run_started:
                self.run_started = True
            if not self.pan_started:
                self.next_pan_angle = self.start_pan_angle
                self.pan_started = True
            if not self.pan_done and self.pan_angle >= self.next_pan_angle:
                filename = self.base_file_name + "%04d" % self.sequence + '.jpg'
                row = self._saveImage(filename)
                self.data.append(row)
                self.sequence += 1
                if self.pan_angle >= self.end_pan_angle:
                    self.pan_done = True
                else:
                    self.next_pan_angle += self.step_pan_angle

    # returns data row with filename, pan_angle, tilt_angle, ground_distance, and slope_distance
    def _saveImage(self, filename):
        # capture current values
        pan_angle = self.pan_angle
        tilt_angle = self.tilt_angle
        ground_distance = self.ground_distance
        slope_distance = self.slope_distance

        ret, image = self.cap.read()
        if not ret:
            raise Exception('Could not read image from camera')
        
        center, radii = self._calculateEllipse(tilt_angle, slope_distance)
        
        # Create a mask (e.g., a black circle on a white background)
        mask = np.zeros(image.shape[:2], dtype=np.uint8)
        cv2.ellipse(mask, center, radii, 0, 0, 360, 255, -1)
        # Apply the mask to the image
        masked_image = cv2.bitwise_and(image, image, mask=mask)
        image_crop = masked_image[int(center[1] - radii[1]):int(center[1] + radii[1]),
                                  int(center[0] - radii[0]):int(center[0] + radii[0])]
        cv2.imwrite(os.path.join(self.folder, filename), image_crop)
        return [filename, pan_angle, tilt_angle, ground_distance, slope_distance]
                    
    def _writeData(self):
        is_new = not os.path.exists(self.csv_path)
        print(f'Updating file {self.csv_path}') 
        with open(self.csv_path, 'a', newline='') as csvfile:
            writer = csv.writer(csvfile)
            if is_new:
                headers = ['FILE_NAME', 'PAN_ANGLE', 'TILT_ANGLE', 'GROUND_DISTANCE', 'SLOPE_DISTANCE']
                writer.writerow(headers)
            writer.writerows(self.data)

    # should be done at tilt of 90 degrees
    # robot should be in MANUAL_DRIVE state
    def adjustImageCenter(self, slope_distance, target_radius=0.045, view_size=640):
        crop_size = 2 * self.res_per_deg * math.atan2(target_radius, slope_distance) * 180 / math.pi
        self._openCamera()
        # initial step size of 16 pixels
        step = 16
        while True:
            # read the next frame
            ret, image = self.cap.read()
            if not ret:
                raise Exception('Could not read image from camera')
        
            # crop the center
            image_crop = image[int(self.center_y - crop_size/2):int(self.center_y + crop_size/2),
                               int(self.center_x - crop_size/2):int(self.center_x + crop_size/2)]
    
            # scale up to make adjustment easier
            image_resize = cv2.resize(image_crop, (view_size, view_size))
            
            # display the resulting image
            cv2.imshow('frame', image_resize)
            key = cv2.waitKey(20) & 0xFF
            if key == ord('q'):
                break
            elif key == ord('a'):
                self.center_x -= step
            elif key == ord('s'):
                self.center_x += step
            elif key == ord('w'):
                self.center_y -= step
            elif key == ord('z'):
                self.center_y += step
            elif key == ord('e'):
                step *= 2
            elif key == ord('d'):
                if step <= 2:
                    step = 1
                else:
                    step /= 2;
        
        self.cap.release()
        cv2.destroyAllWindows()
        print(self.center_x, self.center_y)

    def _calculateEllipse(self, tilt_angle, slope_distance):
        # in perspective, a tilt will produce an ellipse, but the center of 
        # the ellipse will not vertically coincide with the center of the plate
        tilt_radians = tilt_angle * math.pi / 180
        # law of cosines
        x1 = math.sqrt(slope_distance**2 + self.target_radius**2 - slope_distance * self.target_radius * math.cos(math.pi - tilt_radians))
        x2 = math.sqrt(slope_distance**2 + self.target_radius**2 - slope_distance * self.target_radius * math.cos(tilt_radians))
        angle1 = math.asin(self.target_radius * math.sin(math.pi - tilt_radians) / x1) * 180 / math.pi
        angle2 = math.asin(self.target_radius * math.sin(tilt_radians) / x2) * 180 / math.pi
        top_offset = -angle1 * self.res_per_deg
        bot_offset = angle2 * self.res_per_deg
        center_offset = (top_offset + bot_offset) / 2
        v_radius = (bot_offset - top_offset)/2
        # adjust vertical for plate offset
        plate_offset_m = self.plate_thickness * math.cos(tilt_radians)
        plate_offset_a = math.atan2(plate_offset_m, slope_distance) * 180 / math.pi
        plate_offset = -plate_offset_a * self.res_per_deg
        # horizontal radius
        h_radius = self.res_per_deg * math.atan2(self.target_radius, slope_distance) * 180 / math.pi
        return (self.center_x, int(self.center_y + center_offset + plate_offset)), (int(h_radius), int(v_radius))

In [None]:
tipali = NetworkTables.getTable("TiPaLi")

In [None]:
dc = DataCollector(tipali, folder='data_collection', step_pan_angle=30, 
                   camera_h_fov=69, camera_h_res=640, camera_v_res=480, camera_id=0)

In [None]:
dc.adjustImageCenter(1)

In [None]:
dc.startRun('test_001_')