## 定點式

In [None]:
from queue import Queue

#from multiprocessing import Queue
import cv2
import time
import matplotlib.pyplot as plt
import traceback
import threading
import numpy as np
import IPython.display as display

import matplotlib
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from tkinter import *

import warnings
warnings.filterwarnings('ignore')

def hp(ts, lamb=10):
    def D_matrix(N):
        D = np.zeros((N-1,N))
        D[:,1:] = np.eye(N-1)
        D[:,:-1] -= np.eye(N-1)
        """D1
        [[-1.  1.  0. ...  0.  0.  0.]
         [ 0. -1.  1. ...  0.  0.  0.]
         [ 0.  0. -1. ...  0.  0.  0.]
         ...
         [ 0.  0.  0. ...  1.  0.  0.]
         [ 0.  0.  0. ... -1.  1.  0.]
         [ 0.  0.  0. ...  0. -1.  1.]]
        """
        return D
    N = len(ts)
    D1 = D_matrix(N)
    D2 = D_matrix(N-1)
    D = D2 @ D1
    g = np.linalg.inv((np.eye(N)+lamb*D.T@D))@ ts
    return g

class Producer(threading.Thread):
    def __init__(self,data_queue,*args,**kwargs):
        super(Producer, self).__init__(*args,**kwargs)
        self.data_queue = data_queue
 
    def run(self):
        capture = cv2.VideoCapture(0)  # 0是代表摄像头编号，只有一个的话默认为0
        capture.set(cv2.CAP_PROP_FPS, 10)
        try:
            t0 = time.time()
            while (True):
                ref, frame = capture.read()
                frame = frame[:,::-1,:].copy()
                H, W, _ = frame.shape
                w, h = 40, 40
                x, y = W//2 -w//2, H//4-h//2
                area = frame[y:y + h, x:x + w, :]
                cv2.rectangle(frame, (x,y), (x+w,y+h), (0,255,0), 2)
                frame[:h,:w] = area
                
                t = time.time()-t0
                cv2.putText(frame, 't={:.3f}'.format(t), (10, H-10), cv2.FONT_HERSHEY_PLAIN, 1.2, (255, 255, 255), 2)
                cv2.imshow("face", frame)

                B = np.average(area[:,:,0])
                G = np.average(area[:,:,1])
                R = np.average(area[:,:,2])
                if self.data_queue.full():
#                     print('Full')
                    self.data_queue.queue.popleft()
                self.data_queue.put((t,B,G,R))

                c = cv2.waitKey(1) & 0xff  # 等待10ms显示图像，若过程中按“Esc”退出
                if c == 27:
                    capture.release()
                    break
        except:
            traceback.print_exc()
        finally:
            capture.release()
            cv2.destroyAllWindows()
            if self.data_queue.full():
                self.data_queue.get()
            self.data_queue.put('Bye')
        print('Producer quit')
        
class Consumer(threading.Thread):
   
    def __init__(self,data_queue,*args,**kwargs):
        super(Consumer, self).__init__(*args,**kwargs)
        self.data_queue = data_queue
 
    def run(self):
        time.sleep(1)

        fig, axes = plt.subplots(3, 3,figsize=(8,5))
        axes[0, 0].set_title('Raw') # 原始信號
        axes[0, 1].set_title('HP') # 濾波殘差
        axes[0, 2].set_title('FFT') # 頻譜
        axes[0, 0].set_ylabel('Blue')
        axes[1, 0].set_ylabel('Green')
        axes[2, 0].set_ylabel('Red')
        axes[2, 0].set_xlabel('Time(s)')
        axes[2, 1].set_xlabel('Time(s)')
        axes[2, 2].set_xlabel('Frequency(Hz)')
        
        start = None
        lines = [None, None, None]
        glines = [None, None, None]
        rlines = [None, None, None]
        flines = [None, None, None]
        BGR = [None, None, None]
        g = [None, None, None]
        r = [None, None, None]
        f = [None, None, None]
        num_fft = 256

        while True:
            time.sleep(0.2)
            if self.data_queue.qsize() > 2:
                if self.data_queue.queue[-1] == 'Bye':
                    break
                ts, BGR[0], BGR[1], BGR[2] = zip(*self.data_queue.queue)
                t = ts[-1] if len(ts) > 0 else 0

                for i in range(3):
                    g[i] = hp(BGR[i], 1000)
                    r[i] = BGR[i] - g[i]

                # FFT
                for i in range(3):
                    rr = r[i][-num_fft:]
                    f[i] = np.fft.fft(rr, num_fft)
                    f[i] = np.abs(f[i])[:num_fft//2]
                fs =len(rr)/ (ts[-1] - ts[-len(rr)])


                if start is None:
#                     start = 1
                    lines[0] = axes[0,0].plot(ts, BGR[0], '-b')[0]
                    lines[1] = axes[1,0].plot(ts, BGR[1], '-g')[0]
                    lines[2] = axes[2,0].plot(ts, BGR[2], '-r')[0]
#                     print(BGR[0],BGR[1],BGR[2])
                    glines[0] = axes[0,0].plot(ts, g[0], '-k')[0]
                    glines[1] = axes[1,0].plot(ts, g[1], '-k')[0]
                    glines[2] = axes[2,0].plot(ts, g[2], '-k')[0]
#                     print(g[0],g[1],g[2])
                    rlines[0] = axes[0, 1].plot(ts, r[0], '-b')[0]
                    rlines[1] = axes[1, 1].plot(ts, r[1], '-g')[0]
                    rlines[2] = axes[2, 1].plot(ts, r[2], '-r')[0]
#                     print(r[0],r[1],r[2])
                    flines[0] = axes[0, 2].plot(np.arange(num_fft//2)*fs/num_fft, f[0], '-b', marker='*')[0]
                    flines[1] = axes[1, 2].plot(np.arange(num_fft//2)*fs/num_fft, f[1], '-g', marker='*')[0]
                    flines[2] = axes[2, 2].plot(np.arange(num_fft//2)*fs/num_fft, f[2], '-r', marker='*')[0]
                    
#                     print('here')
                for i in range(3):
                    lines[i].set_xdata(ts)
                    lines[i].set_ydata(BGR[i])
                    glines[i].set_xdata(ts)
                    glines[i].set_ydata(g[i])
                    rlines[i].set_xdata(ts)
                    rlines[i].set_ydata(r[i])
                    flines[i].set_xdata(np.arange(num_fft//2)*fs/num_fft)
                    flines[i].set_ydata(f[i])

                for i in range(3):
                    axes[i,0].set_xlim([t - 10, t + 1])
                    axes[i,0].set_ylim([np.min(BGR[i][-num_fft:]), np.max(BGR[i][-num_fft:])])
                    axes[i, 1].set_xlim([t - 10, t + 1])
                    axes[i, 1].set_ylim([np.min(r[i][-num_fft:]), np.max(r[i][-num_fft:])])
                    axes[i, 2].set_xlim([0, fs//2])
                    axes[i, 2].set_ylim([np.min(f[i]), np.max(f[i])])

                plt.pause(0.1)
        print('Consumer quit')
        

matplotlib.use('TkAgg')

# root = Tk()
# root.geometry('300x150')
# root.title("tkinter and matplotlib")

# 運算 & 製圖
N = 300
data_queue = Queue(N)
p = Producer(data_queue)
p.start()
c = Consumer(data_queue)
c.start()
p.join()
c.join()

# GUI
def draw_picture():
#     axes.clear()
    canvs.draw()
canvs = FigureCanvasTkAgg(fig, root)
canvs.get_tk_widget().pack(side=TOP, fill=BOTH, expand=1)
draw_picture()

print('EXIT')

# root.mainloop()

## 追蹤式

In [4]:
from queue import Queue

#from multiprocessing import Queue
import cv2
import time
import matplotlib.pyplot as plt
import traceback
import threading
import numpy as np
import IPython.display as display

import matplotlib
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from tkinter import *

import dlib
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("D:/Record/Computer_Science/shape_predictor_68_face_landmarks.dat")
        
import warnings
warnings.filterwarnings('ignore')

def hp(ts, lamb=10):
    def D_matrix(N):
        D = np.zeros((N-1,N))
        D[:,1:] = np.eye(N-1)
        D[:,:-1] -= np.eye(N-1)
        """D1
        [[-1.  1.  0. ...  0.  0.  0.]
         [ 0. -1.  1. ...  0.  0.  0.]
         [ 0.  0. -1. ...  0.  0.  0.]
         ...
         [ 0.  0.  0. ...  1.  0.  0.]
         [ 0.  0.  0. ... -1.  1.  0.]
         [ 0.  0.  0. ...  0. -1.  1.]]
        """
        return D
    N = len(ts)
    D1 = D_matrix(N)
    D2 = D_matrix(N-1)
    D = D2 @ D1
    g = np.linalg.inv((np.eye(N)+lamb*D.T@D))@ ts
    return g

class Producer(threading.Thread):
    def __init__(self,data_queue,*args,**kwargs):
        super(Producer, self).__init__(*args,**kwargs)
        self.data_queue = data_queue
 
    def run(self):
        capture = cv2.VideoCapture(0)  # 0是代表摄像头编号，只有一个的话默认为0
        capture.set(cv2.CAP_PROP_FPS, 10)
        try:
            t0 = time.time()
            while (True):
                ref, frame = capture.read()
                img_gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) #灰階 
                faces = detector(img_gray, 0)
                if(len(faces)==1):
                    for i in range(len(faces)):
                        for k, d in enumerate(faces):
                            cv2.rectangle(frame, (d.left(), d.top()), (d.right(), d.bottom()), (0, 0, 255))

                    frame = frame.copy()
                    H, W, _ = frame.shape
                    w_ = int( (int(d.right()) - int(d.left()))/2 )
                    w, h = int( int(d.right()/4) - int(d.left()/4) ), int( int(d.bottom()/4) - int(d.top()/4) ) #40, 40
                    x, y = W//2 -w//2, H//4-h//2
                    area = frame[ int(d.top())+h : int(d.top())+h*2 , int(d.left()+w_)-int(w/2) : int(d.left()+w_)+int(w/2), :] # 上下左右
                    cv2.rectangle(frame, ( int(d.left()+w_)-int(w/2) , int(d.top())+h ), ( int(d.left()+w_)+int(w/2) , int(d.top())+h*2 ), (0,255,0))
                    # 上 int(d.top()) + h
                    # 下 int(d.top()) + h*2
                    # 左 int(d.left()+w_) - int(w/2)
                    # 右 int(d.left()+w_) + int(w/2)
                    h2, w2, _ = area.shape
                    frame[:h2,:w2] = area
                else:
                    frame = frame.copy()
                    H, W, _ = frame.shape
                    w, h = 40, 40
                    x, y = W//2 -w//2, H//4-h//2
                    area = frame[y:y + h, x:x + w, :]
                    cv2.rectangle(frame, (x,y), (x+w,y+h), (0,255,0), 2)
                    frame[:h,:w] = area
                
                t = time.time()-t0
                cv2.putText(frame, 't={:.3f}'.format(t), (10, H-10), cv2.FONT_HERSHEY_PLAIN, 1.2, (255, 255, 255), 2)
                cv2.imshow("face", frame)

                B = np.average(area[:,:,0])
                G = np.average(area[:,:,1])
                R = np.average(area[:,:,2])
                if self.data_queue.full():
#                     print('Full')
                    self.data_queue.queue.popleft()
                self.data_queue.put((t,B,G,R))

                c = cv2.waitKey(1) & 0xff  # 等待10ms显示图像，若过程中按“Esc”退出
                if c == 27:
                    capture.release()
                    break
        except:
            traceback.print_exc()
        finally:
            capture.release()
            cv2.destroyAllWindows()
            if self.data_queue.full():
                self.data_queue.get()
            self.data_queue.put('Bye')
        print('Producer quit')
        
class Consumer(threading.Thread):
   
    def __init__(self,data_queue,*args,**kwargs):
        super(Consumer, self).__init__(*args,**kwargs)
        self.data_queue = data_queue
 
    def run(self):
        time.sleep(1)

        fig, axes = plt.subplots(3, 3,figsize=(8,5))
        axes[0, 0].set_title('Raw') # 原始信號
        axes[0, 1].set_title('HP') # 濾波殘差
        axes[0, 2].set_title('FFT') # 頻譜
        axes[0, 0].set_ylabel('Blue')
        axes[1, 0].set_ylabel('Green')
        axes[2, 0].set_ylabel('Red')
        axes[2, 0].set_xlabel('Time(s)')
        axes[2, 1].set_xlabel('Time(s)')
        axes[2, 2].set_xlabel('Frequency(Hz)')
        
        start = None
        lines = [None, None, None]
        glines = [None, None, None]
        rlines = [None, None, None]
        flines = [None, None, None]
        BGR = [None, None, None]
        g = [None, None, None]
        r = [None, None, None]
        f = [None, None, None]
        num_fft = 256

        while True:
            time.sleep(0.2)
            if self.data_queue.qsize() > 2:
                if self.data_queue.queue[-1] == 'Bye':
                    break
                ts, BGR[0], BGR[1], BGR[2] = zip(*self.data_queue.queue)
                t = ts[-1] if len(ts) > 0 else 0

                for i in range(3):
                    g[i] = hp(BGR[i], 1000)
                    r[i] = BGR[i] - g[i]

                # FFT
                for i in range(3):
                    rr = r[i][-num_fft:]
                    f[i] = np.fft.fft(rr, num_fft)
                    f[i] = np.abs(f[i])[:num_fft//2]
                fs =len(rr)/ (ts[-1] - ts[-len(rr)])


                if start is None:
#                     start = 1
                    lines[0] = axes[0,0].plot(ts, BGR[0], '-b')[0]
                    lines[1] = axes[1,0].plot(ts, BGR[1], '-g')[0]
                    lines[2] = axes[2,0].plot(ts, BGR[2], '-r')[0]
#                     print(BGR[0],BGR[1],BGR[2])
                    glines[0] = axes[0,0].plot(ts, g[0], '-k')[0]
                    glines[1] = axes[1,0].plot(ts, g[1], '-k')[0]
                    glines[2] = axes[2,0].plot(ts, g[2], '-k')[0]
#                     print(g[0],g[1],g[2])
                    rlines[0] = axes[0, 1].plot(ts, r[0], '-b')[0]
                    rlines[1] = axes[1, 1].plot(ts, r[1], '-g')[0]
                    rlines[2] = axes[2, 1].plot(ts, r[2], '-r')[0]
#                     print(r[0],r[1],r[2])
                    flines[0] = axes[0, 2].plot(np.arange(num_fft//2)*fs/num_fft, f[0], '-b', marker='*')[0]
                    flines[1] = axes[1, 2].plot(np.arange(num_fft//2)*fs/num_fft, f[1], '-g', marker='*')[0]
                    flines[2] = axes[2, 2].plot(np.arange(num_fft//2)*fs/num_fft, f[2], '-r', marker='*')[0]
                    
#                     print('here')
                for i in range(3):
                    lines[i].set_xdata(ts)
                    lines[i].set_ydata(BGR[i])
                    glines[i].set_xdata(ts)
                    glines[i].set_ydata(g[i])
                    rlines[i].set_xdata(ts)
                    rlines[i].set_ydata(r[i])
                    flines[i].set_xdata(np.arange(num_fft//2)*fs/num_fft)
                    flines[i].set_ydata(f[i])

                for i in range(3):
                    axes[i,0].set_xlim([t - 10, t + 1])
                    axes[i,0].set_ylim([np.min(BGR[i][-num_fft:]), np.max(BGR[i][-num_fft:])])
                    axes[i, 1].set_xlim([t - 10, t + 1])
                    axes[i, 1].set_ylim([np.min(r[i][-num_fft:]), np.max(r[i][-num_fft:])])
                    axes[i, 2].set_xlim([0, fs//2])
                    axes[i, 2].set_ylim([np.min(f[i]), np.max(f[i])])

                plt.pause(0.1)
        print('Consumer quit')
        

matplotlib.use('TkAgg')

root = Tk()
# root.geometry('300x150')
# root.title("tkinter and matplotlib")

# 運算 & 製圖
N = 300
data_queue = Queue(N)
p = Producer(data_queue)
p.start()
c = Consumer(data_queue)
c.start()
p.join()
c.join()

# GUI
def draw_picture():
#     axes.clear()
    canvs.draw()
fig = Figure(figsize=(5, 4), dpi=100)
canvs = FigureCanvasTkAgg(fig, root)
canvs.get_tk_widget().pack(side=TOP, fill=BOTH, expand=1)
draw_picture()

print('EXIT')

# root.mainloop()

Producer quit
Consumer quit
EXIT
