In [1]:
import io, os, ssl, ast, json, http, socket, queue, struct, sqlite3, threading, logging, datetime 
import flask, pynmea2, flask_basicauth, flask_cors
#from IPython.display import clear_output

server_ip    = 'yzlab3.chem.nyu.edu'
img_port     = 54321
cmd_port     = 54322
flask_port   = 54323
sensor_port  = 54324

with open('src/map.html', 'r') as fr: mapPage = fr.read()
with open('src/rainbow.jpg', 'rb') as fr: rainbow = fr.read() 
with open('secret/basicAuth.conf', 'r') as fr: basicAuth = fr.read()
basic_auth_username = basicAuth.split('\n')[0]
basic_auth_password = basicAuth.split('\n')[1]
server_cert_letsencrypt  = f'{os.environ["HOME"]}/openssl/fullchain.pem'  # secret of my apache and flask site
server_key_letsencrypt   = f'{os.environ["HOME"]}/openssl/privkey.pem'    # secret of my apache and flask site
server_cert_self_signed  = 'secret/server.crt'    # shared with both rover and driver for identity authentication of server 
server_key_self_signed   = 'secret/server.key'    
client_certs_self_signed = 'secret/clients.crt'   # for identity authentication of rover and driver

ssl_context_self_signed = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=client_certs_self_signed)  
ssl_context_self_signed.verify_mode = ssl.CERT_REQUIRED
ssl_context_self_signed.load_cert_chain(certfile=server_cert_self_signed, keyfile=server_key_self_signed) 

## Sensor Link

#### Backend Database

In [3]:
q_in, q_out = queue.Queue(), queue.Queue() 
def dbd(): # avoid sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.
    def processor(s): 
        GPRMC, GPGGA, BNO055, _ = s.decode().split('\r\n') 
        GPRMC, GPGGA = pynmea2.parse(GPRMC), pynmea2.parse(GPGGA) 
        if GPRMC.status == 'A' and GPGGA.gps_qual == 2:  # GPRMC and GPGGA are ready 
            lat, lon = GPRMC.lat, GPRMC.lon
            lat, lon = float(lat[:2]) + float(lat[2:])/60, float(lon[:3]) + float(lon[3:])/60
            if GPRMC.lat_dir == 'S': lat *= -1
            if GPRMC.lon_dir == 'W': lon *= -1 
            alt  = GPGGA.altitude
            speed = GPRMC.spd_over_grnd
            course = GPRMC.true_course
            timestamp = datetime.datetime.combine(GPRMC.datestamp, GPRMC.timestamp).timestamp()
            sats = GPGGA.num_sats # nums of Satellites are in view
            accu = GPGGA.horizontal_dil # Relative accuracy of horizontal position 
            _, temp, x, y, z, v1, v2 = BNO055.split(',')
            return lat, lon, alt, speed, course, timestamp, sats, accu, temp, x, y, z, v1, v2
        else: return None 
        
    db = sqlite3.connect(":memory:")
    #try:
    #    os.remove("GPS.db")
    #except:
    #    pass
    #db = sqlite3.connect("GPS.db")
    cur = db.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", ('tracking',))
    if not cur.fetchone():
        cur.execute("CREATE TABLE tracking (lat, lon, alt, speed, course, timestamp, sats, accu, temp, x, y, z, v1, v2)") 
    while True:
        item = q_in.get()
        if item[0] == 'INSERT':
            _ = processor(item[1])
            cur.execute("""
                INSERT INTO tracking (lat, lon, alt, speed, course, timestamp, sats, accu, temp, x, y, z, v1, v2)
                VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", _)  
        elif item[0] == 'HS': # Home Spot
            cur.execute("SELECT lon, lat, timestamp FROM tracking WHERE timestamp=(SELECT MIN(timestamp) FROM tracking)")
            q_out.put(cur.fetchone()) 
        elif item[0] == 'CS': # Current Spot 
            cur.execute("SELECT lon, lat, timestamp FROM tracking WHERE timestamp=(SELECT MAX(timestamp) FROM tracking)") 
            q_out.put(cur.fetchone()) 
        elif item[0] == 'Tr': # Trail 
            cur.execute("SELECT lon, lat, timestamp FROM tracking WHERE timestamp>=?", (float(item[1]),))    
            q_out.put(cur.fetchall())  
        if item[0]!='INSERT':
            print(item)
        q_in.task_done()

threading.Thread(target=dbd).start()  

In [4]:
def sensord():
    long  = struct.calcsize('<H')
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((server_ip, sensor_port))
    s.listen(0)
    while True: 
        print(f"[Sensor link - Rover] Waiting for incoming connection")
        c, a = s.accept() 
        print(f"[Sensor link - Rover] Connection established")
        ss = ssl_context_self_signed.wrap_socket(c, server_side=True).makefile('rb')
        while True:
            try:
                data_len, = struct.unpack('<H', ss.read(long)) 
                data = ss.read(data_len) 
                q_in.join()
                q_in.put(('INSERT', data))
            except socket.timeout:  
                ss.shutdown(socket.SHUT_RDWR)  
                ss.close()
                print('[Sensor link - Rover] timeout')
                break
            except ConnectionResetError: 
                print('[Sensor link - Rover] ConnectionResetError')
                break 
threading.Thread(target=sensord).start()  

[Sensor link - Rover] Waiting for incoming connection
[Sensor link - Rover] Connection established


## Control Link

In [None]:
def getCommonName(ss):
    for i in ss.getpeercert()['subject']:
        if i[0][0]=='commonName':
            return i[0][1] 

class XboxController():
    def __init__(self):
        self.left_track     = -1 # 
        self.right_track    = -1 # 
        self.gimbal_sense   = 100
        self.rot_max        = 2500
        self.rot_min        = 600
        self.ele_max        = 2200 
        self.ele_min        = 1500
        self.rot_front      = 2200      
        self.ele_horiz      = 1850
        self.rot            = self.rot_front
        self.ele            = self.ele_horiz 
    def processor(self, code, value):    # 31000 32000  1111   
        if   code == 2:  # X
            self.rot += self.gimbal_sense * value / 32768
            if   self.rot>self.rot_max: self.rot = self.rot_max
            elif self.rot<self.rot_min: self.rot = self.rot_min
            value = self.rot
        elif code == 3:  # Y
            self.ele += self.gimbal_sense * value / 32768 
            if   self.ele>self.ele_max: self.ele = self.ele_max
            elif self.ele<self.ele_min: self.ele = self.ele_min
            value = self.ele
        elif code == 4:  # left trigger
            if value > 20: value = self.left_track * value / 255
            else         : value = 0 
        elif code == 5:  # right trigger
            if value > 20: value = self.right_track * value / 255
            else         : value = 0
        elif code == 14: # button over right trigger
            if value : self.right_track =  1
            else     : self.right_track = -1
        elif code == 15: # button over left trigger
            if value : self.left_track =  1
            else     : self.left_track = -1
        elif code == 8:  # gimbal sensitivity 
            enum = [10, 100, 1000]
            idx  = enum.index(self.gimbal_sense) 
            idx += value 
            if idx < len(enum): 
                self.gimbal_sense = enum[idx] 
            else:    
                self.gimbal_sense = enum[0] 
        return code, value  

xbox = XboxController() 

def cmd_recvd(ss): 
    while True:
        try:
            data = ss.recv(8) 
            code, value = struct.unpack('<hi', data)
            code, value = xbox.processor(code, value)   
            cmd_queue.put(struct.pack('<hf', code, value)) 
        except socket.timeout:  
            ss.shutdown(socket.SHUT_RDWR)  
            ss.close()
            print('[Control link - Driver] Timeout')
            break
        except ConnectionResetError: 
            print('[Control link - Driver] ConnectionResetError')
            break
        
def cmd_sendd(ss): 
    while True: 
        data = cmd_queue.get()
        try: 
            ss.send(data) # Send code and value to server 
        except socket.timeout:
            ss.shutdown(socket.SHUT_RDWR)  
            ss.close() 
            print('[Control link - Rover] Timeout')
            break
        except BrokenPipeError: 
            print('[Control link - Rover] BrokenPipeError')
            break
        except ConnectionResetError: 
            print('[Control link - Rover] ConnectionResetError')
            break

def control_link_daemon(): 
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((server_ip, cmd_port))
    s.listen(0)
    while True: 
        print(f"[Control link] Waiting for incoming connection")
        c, a = s.accept() 
        ss = ssl_context_self_signed.wrap_socket(c, server_side=True)
        ss.settimeout(30)
        name = getCommonName(ss)
        print(f"[Control link] Connection with {name} established ~")
        if name == 'driver': 
            threading.Thread(target=cmd_recvd, args=(ss,)).start()
        elif name == 'rover':
            threading.Thread(target=cmd_sendd, args=(ss,)).start() 
        
threading.Thread(target=control_link_daemon).start()

## Image Link

In [None]:
import time
class MJPEG():# image cache
    def __init__(self):
        self.frame = rainbow # default image
        self.condition = threading.Condition()
        threading.Thread(target=self.update).start() 

    def update(self): 
        long  = struct.calcsize('<L')  
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((server_ip, img_port))
        s.listen(0)
        while True: 
            print(f"[Image link] Waiting for incoming connection")
            c, a = s.accept() 
            ss = ssl_context_self_signed.wrap_socket(c, server_side=True).makefile('rb')      
            while True:
                try:
                    img_len, = struct.unpack('<L', ss.read(long)) 
                    with self.condition:
                        self.frame = ss.read(img_len)  
                        self.condition.notify_all()
                except socket.timeout:  
                    ss.shutdown(socket.SHUT_RDWR)  
                    ss.close()
                    print('[Image link] Camera timeout')
                    break
                except ConnectionResetError: 
                    print('[Image link] Camera ConnectionResetError')
                    break         
#mjpeg = MJPEG()

## Web Front

In [None]:
app = flask.Flask(__name__)
flask_cors.CORS(app) 
app.config['BASIC_AUTH_USERNAME'] = basic_auth_username
app.config['BASIC_AUTH_PASSWORD'] = basic_auth_password
basic_auth = flask_basicauth.BasicAuth(app)

#log = logging.getLogger('werkzeug')
#log.setLevel(logging.ERROR) 

@app.route("/")
@basic_auth.required
def home(): 
    return flask.render_template_string(mapPage, 
        ip_port       = f'{server_ip}:{flask_port}', 
        username      =  basic_auth_username, 
        password      =  basic_auth_password)  

@app.route("/stream.mjpg")
@basic_auth.required
def mjpg():   
    def generator():  
        count = 0
        while True:   
            with mjpeg.condition:
                mjpeg.condition.wait()
                frame = mjpeg.frame 
                count += 1
                if count % 25 == 1: print(datetime.datetime.now()) 
            yield f'''--FRAME\r\nContent-Type: image/jpeg\r\nContent-Length: {len(frame)}\r\n\r\n'''.encode() 
            yield frame
    r = flask.Response(response=generator(), status=200)
    r.headers.extend({'Age':0, 'Content-Type':'multipart/x-mixed-replace; boundary=FRAME',
                      'Pragma':'no-cache', 'Cache-Control':'no-cache, private',}) 
    return r

#RESPONSE = sensor.data 
@app.route("/sensor", methods=['GET'])
@basic_auth.required
def sensor():
    global RESPONSE   
    newest_timestamp_client_side = flask.request.args.get('timestamp') 
    q_in.join()  
    q_in.put(newest_timestamp_client_side) 
    llts = q_out.get() # lat, long, timestamp 
    if llts:
        RESPONSE.update({
            "rover": {
                "type": "Feature", 
                "geometry": {
                    "type": "Point",
                    "coordinates": llts[-1][:2] # long lat in GeoJson
                }, 
                "properties": {
                    "newestViewCenter": llts[-1][1::-1], # lat long in Google and Openstreetmap
                    "newestTimestampServerSide": llts[-1][2],
                    "homePoint": "false"
                },
            }
        })
        if newest_timestamp_client_side == 'newcomer': 
            RESPONSE['rover']['properties']['homePoint'] = llts[-1][1::-1]
    if len(llts) > 1: 
        RESPONSE.update({
            "trail": {
                "type": "Feature",
                "geometry": {
                    "type": "LineString",
                    "coordinates": [i[:2] for i in llts]
                }
            }
        })
    return flask.jsonify(RESPONSE)

@app.route('/axios.min.js', methods=['GET'])
def axios_js():
        return flask.send_file('src/axios.min.js', mimetype='application/javascript')
    
@app.route('/leaflet.rotatedMarker.js', methods=['GET'])
def rotatedMarker_js():
        return flask.send_file('src/leaflet.rotatedMarker.js', mimetype='application/javascript')
    
@app.route('/home.png', methods=['GET'])
def home_svg():
        return flask.send_file('src/home.png', mimetype='image/png')
    
@app.route('/arrow.png', methods=['GET'])
def arrow_svg():
        return flask.send_file('src/arrow.png', mimetype='image/png')

def flask_in_bg():
    app.run(server_ip, flask_port, ssl_context=(server_cert_letsencrypt, server_key_letsencrypt))
#threading.Thread(target=flask_in_bg).start() 
#flask_in_bg()