In [1]:
import time
from IPython.display import Audio
import serial
import pipeline.config as conf
import os
from IPython import display as dis
import threading as thr
import re
import numpy as np
import matplotlib.pyplot as plt


param_dict = conf.open_params()
test_dir = "Datasets/ESC50_augmented_validate/"

In [None]:
files_per_class = 40
c = thr.Condition()
cur_class = ''
running = True

class thread_sound_player(thr.Thread):
    def __init__(self, name, src_dir):
        thr.Thread.__init__(self)
        self.name = name
        self.src_dir = src_dir

    def run(self):
        global cur_class
        global running

        for class_ in os.listdir(self.src_dir):
            for filename in os.listdir(f'{self.src_dir}{class_}')[0:files_per_class]:
                dis.display(Audio(self.src_dir + class_ + "/" + filename, autoplay=True))
                
                c.acquire()
                cur_class = class_
                c.notify_all()
                c.wait()
                c.release()
                

        c.acquire()
        running = False
        c.notify_all()
        c.release()

class thread_receiver(thr.Thread):
    def __init__(self, name, port, bdr):
        thr.Thread.__init__(self)
        self.name = name
        self.ser = serial.Serial(port, bdr, timeout=2)
        self.ser.flushInput()
        self.ser.flushOutput()
        

    def run(self):
        global cur_class
        global running
        sec = 0
        file_cnt = 0
        scores = []
        c_matrix = []
        class_prob = []
        
        while True:
            c.acquire()
            
            if not running:
                #finish process
                
                fig, ax = plt.subplots(figsize=(10, 8))
                print(np.asarray(c_matrix).shape)
                plt.pcolormesh(c_matrix, vmin=0, vmax=1, cmap = 'plasma')
                plt.title(f"Confusion matrix of {param_dict['CNN_name']}-instance on embedded system")
                plt.xlabel("Predicted class")
                plt.ylabel("True class")
                plt.colorbar(label='mean accuracy')
                ax.set_xticks([0.5, 1.5, 2.5, 3.5, 4.5])
                ax.set_yticks([0.5, 1.5, 2.5, 3.5, 4.5])
                ax.set_xticklabels(param_dict["class_map"])
                ax.set_yticklabels(param_dict["class_map"])
                
                return None

            if self.ser.inWaiting() != 0:
                vals = re.sub("[b'\n\r]", '', str(self.ser.readline()))
                chunks = vals[:-4].split(' ')
                scores.append([float(numeric_string) for numeric_string in chunks])
                sec += 1
                self.ser.flushInput()
                
                if(sec == 5):
                    sec = 0
                    file_cnt += 1
                    avg_scores_per_file = np.mean(scores, axis=0)
                    print(avg_scores_per_file)
                    print(param_dict["class_map"][np.argmax(avg_scores_per_file)])
                    class_prob.append(avg_scores_per_file)
                    scores = []
                    c.notify_all()
                  
                if file_cnt == files_per_class:
                    file_cnt = 0
                    c_matrix.append(np.mean(class_prob, axis=0))
                    class_prob = []
                   
            c.release()


player = thread_sound_player("sound player thread", test_dir)
receiver = thread_receiver("uart reception thread", "COM23", 115200)


player.start()
receiver.start()

player.join()
receiver.join()