In [1]:
%pip install flask opencv-python tensorflow numpy

Note: you may need to restart the kernel to use updated packages.


In [2]:
from flask import Flask, render_template, request, redirect, Response , send_from_directory , session, jsonify
import cv2
import threading 
import json
import queue as q
from tkinter import filedialog,Tk
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense, Dropout, LSTM, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.models import Sequential
import time

In [3]:
model_path = 'model/thunder.tflite'
interpreter = tf.lite.Interpreter(model_path=model_path)
input_details = interpreter.get_input_details()
interpreter.resize_tensor_input(input_details[0]['index'], np.array([1, 256, 256, 3]), strict=True)
output_details = interpreter.get_output_details()
interpreter.allocate_tensors()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [4]:
EDGES = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

In [5]:
def draw(frame, keypoints, confidence_threshold) :
    y, x, c = frame.shape
    person = np.squeeze(np.multiply(keypoints,[y,x,1]))

    for kp in person:
        ky, kx, kp_conf = kp
        if kp_conf > confidence_threshold:
            cv2.circle(frame, (int(kx), int(ky)), 4, (0,255,0), -1) 

    for edge, color in EDGES.items():
        p1, p2 = edge
        y1, x1, c1 = person[p1]
        y2, x2, c2 = person[p2]
        
        if (c1 > confidence_threshold) & (c2 > confidence_threshold):      
            cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,0,255), 2)

In [6]:
actions = np.array(['jab', 'hook', 'uppercut', 'block', 'none'])

In [7]:
model = Sequential([
        Conv1D(64, 3, activation='relu', input_shape=(13,3)),
        MaxPooling1D(2),
        Conv1D(128, 3, activation='relu'),
        MaxPooling1D(2),
        Flatten(),
        Dense(64, activation='relu'),
        Dropout(0.5),
        Dense(actions.shape[0], activation='softmax')
])

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [8]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [9]:
model.load_weights('model/trained_advanced.h5')

In [10]:
app = Flask(__name__)
app.secret_key = 'my_secret_key '

data_file = os.path.join(os.getcwd(), 'user_data/data.json')

users = {}

with open(data_file,"r") as file :
    users = json.load(file) 

current_user = None
cap = None
result_queue = q.Queue()
data_queue = q.Queue()
notification = ""

In [11]:
def distance(point1, point2):
    return np.linalg.norm(np.array(point1) - np.array(point2))

def vertical_distance(point1, point2):
    return abs(point1[1] - point2[1])  

def angular_distance(point1, point2):
    return np.arctan2(point2[1] - point1[1], point2[0] - point1[0])

In [12]:
def process_input(selected_input):
    global cap
    if cap is not None:
        cap.release()
        cap = None

    if selected_input == "Live Input":
        threading.Thread(target=capture_video, args=(0,)).start()

    elif selected_input == "Video Input":
        root = Tk()
        root.attributes('-topmost', True)
        root.withdraw()  
        video_path = filedialog.askopenfilename(filetypes=[("Video files", "*.mp4;*.avi;*.mkv")])
        root.destroy()
        
        if video_path:
            threading.Thread(target=capture_video, args=(video_path,)).start()

    return generate_frames()

def generate_frames():
    while True:
        frame = result_queue.get()
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
        
def generate_data():
    while True:
        data = data_queue.get()
        yield f"data:{json.dumps(data)}\n\n"
        
    
def capture_video(video_source):
    global cap

    prevRight_fist = []
    prevLeft_fist = []
    prevAction = 'none'
    action_count = acceleration = prev_time = current_time = 0
    #force = 0
    #mass_of_fist = 1

    cap = cv2.VideoCapture(video_source)
    #cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)

    '''speed_factor = 1.5

    original_frame_rate = cap.get(cv2.CAP_PROP_FPS)
    new_frame_rate = original_frame_rate * speed_factor
    cap.set(cv2.CAP_PROP_FPS, new_frame_rate)'''

    while True:
        ret, frame = cap.read()
        black_screen = np.zeros((768, 1029, 3), dtype=np.uint8) 

        frame = cv2.resize(frame,(1029  ,768))
        if video_source==0 :
            frame = cv2.flip(frame, 1)

        img = frame.copy()
        img = tf.image.resize_with_pad(np.expand_dims(img, axis=0),256,256)
        image_input = tf.cast(img, dtype=tf.uint8)

        interpreter.set_tensor(input_details[0]['index'], np.array(image_input))

        interpreter.invoke()

        keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])

        draw(black_screen,keypoints_with_scores,0.3)

        keypoints = np.squeeze(keypoints_with_scores)
        keypoints = keypoints.reshape(1, -1, 3)[:,:13,:]
        
        res = model.predict(keypoints)
        
        action_performed = actions[np.argmax(res)]
        left_fist = keypoints_with_scores[0][0][9]
        right_fist = keypoints_with_scores[0][0][10]

        if action_performed in ['jab', 'uppercut', 'hook'] and len(prevLeft_fist) > 0 and len(prevRight_fist) > 0 and prevAction == 'none' and action_count < 4:
            current_time = time.time()
            time_interval = current_time - prev_time

            if action_performed == 'jab':
                dist = distance(left_fist, prevLeft_fist) if distance(left_fist, prevLeft_fist) > distance(right_fist, prevRight_fist) else distance(right_fist, prevRight_fist)
            elif action_performed == 'hook':
                dist = angular_distance(left_fist, prevLeft_fist) if angular_distance(left_fist, prevLeft_fist) > angular_distance(right_fist, prevRight_fist) else angular_distance(right_fist, prevRight_fist)
            elif action_performed == 'uppercut':
                dist = vertical_distance(left_fist, prevLeft_fist) if vertical_distance(left_fist, prevLeft_fist) > vertical_distance(right_fist, prevRight_fist) else vertical_distance(right_fist, prevRight_fist)


            if dist < 1e-3 or action_count < 3:
                #prev_time = current_time
                #acceleration = 0
                #force = 0
                action_count += 1

            else:
                acceleration = dist / time_interval if time_interval > 0 else 0
                #force = mass_of_fist * acceleration
                action_count += 1

        elif action_performed in ['jab', 'uppercut', 'hook'] and action_count > 2:
            action_performed = 'none'
            #acceleration = force = 0
        
        elif action_performed == 'block':
            acceleration = force = 0
            

        else:
            prevAction = 'none'
            prevLeft_fist = left_fist
            prevRight_fist = right_fist
            prev_time = time.time()
            action_count = 0


        border_color = (255 , 255 ,255)
        border_thickness = 10

        cv2.rectangle(frame, (0, 0), (1029 - 1, 768 - 1), border_color, border_thickness)
        
        cv2.rectangle(black_screen, (0, 0), (1029 - 1, 768 - 1), border_color, border_thickness)
        

        #data = {'action': action_performed ,'acceleration' : '{:.2f}'.format(acceleration), 'force' : '{:.2f}'.format(force)}       
        data = {'action': action_performed ,'acceleration' : '{:.2f}'.format(acceleration)}

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)    
        
        merged = np.concatenate((black_screen, frame), axis=1)

        merged = cv2.cvtColor(merged, cv2.COLOR_RGB2BGR)
        ret, buffer = cv2.imencode('.jpg', merged, [cv2.IMWRITE_JPEG_QUALITY, 95])
        merged = buffer.tobytes()

        result_queue.put(merged)    
        data_queue.put(data)  


def release_cap():
    global cap
    if cap is not None :
        cap.release()
        cap = None

def notify(note):
    global notification
    notification = note


In [13]:
@app.route('/bg_image')
def bg_image():
    return send_from_directory('images', 'background_image.jpg')

@app.route('/home_page')
def home_page():
    return send_from_directory('images','home_page.jpg')

@app.route('/profile_image')
def profile_image():
    return send_from_directory('images','profile_image.jpg')

@app.route('/')
def index():
    session['previous'] = '/'
    return render_template('Registration.html', notification=notification)

@app.route('/home')
def home():
    session['previous'] = '/home'
    return render_template('Home_Page.html', notification=notification)

@app.route('/profile')
def profile():
    user_data = users.get(current_user)
    session['previous'] = '/profile'
    return render_template('Profile.html', user=user_data, notification=notification)

@app.route('/help')
def help() :
    return render_template("Help.html")

@app.route('/about_us')
def about_us():
    return render_template("About_us.html")

@app.route('/register' , methods = ['POST'])
def register():
    global users
    username = request.form['username']
    password = request.form['password']
    email = request.form['email']
    phone = request.form['phone']
    
    if username in users:
        notify('Username already taken. Please choose a different one.')
        return redirect('/')
    
    users[username] = {'username': username, 'password': password, 'email': email, 'phone': phone}

    with open(data_file,"w") as file:
        json.dump(users,file)

    notify('Registration successful.')
    return redirect('/')

@app.route('/login', methods = ['POST'])
def login():
    global current_user

    username = request.form['login-username']
    password = request.form['login-password']

    if username in users and users[username]['password'] == password :
        if current_user is None :
            notify('Login successful. Welcome, ' + username + '!')
            current_user = username
        else :
            notify("")

        return redirect('/home')
    
    else:
        notify('Invalid username or password')
        return redirect('/')

@app.route('/videostream', methods = ["GET"])
def videostream():
    selected_input = request.args.get('inputSelect')
    img_str = process_input(selected_input)
    return Response(img_str, mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/update', methods = ['GET'])
def update():
    return Response(generate_data(), mimetype='text/event-stream')

@app.route('/release')
def release():
    release_cap()
    notify("")
    return redirect('/home')

@app.route('/update_profile', methods = ["POST"])
def update_profile():
    global users, current_user

    username = request.form['login-username']
    password = request.form['login-password']
    email = request.form['email']
    phone = request.form['phone']

    if username == current_user :
        notify("Profile updated successfully!")
        users[username] = {'username': username, 'password': password, 'email': email, 'phone': phone}
        with open(data_file,"w") as file:
            json.dump(users,file)

    elif username not in users :
        users.pop(current_user)
        current_user = username
        notify("Profile updated successfully!")
        users[username] = {'username': username, 'password': password, 'email': email, 'phone': phone}
        with open(data_file,"w") as file:
            json.dump(users,file)
    
    else :
        notify("Error updating profile. Username already taken!")
        
    return redirect('/profile')

@app.route('/delete_account')
def delete_account():
    users.pop(current_user)
    with open(data_file,"w") as file:
        json.dump(users,file)
    notify("Account Deleted Successfully")
    return redirect('/')

@app.route('/goback')
def goback():
    previous_page = session.get('previous')
    notify("")
    return redirect(previous_page)

@app.route('/logout')
def logout():
    global current_user
    release_cap()
    notify("Logged Out Successfully")
    current_user = None
    return redirect('/')

In [None]:
if __name__ == '__main__':
    app.run(debug=True, use_reloader=False)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [11/Feb/2025 18:10:21] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [11/Feb/2025 18:10:21] "[36mGET /bg_image HTTP/1.1[0m" 304 -
127.0.0.1 - - [11/Feb/2025 18:10:45] "[32mPOST /register HTTP/1.1[0m" 302 -
127.0.0.1 - - [11/Feb/2025 18:10:45] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [11/Feb/2025 18:10:45] "[36mGET /bg_image HTTP/1.1[0m" 304 -
127.0.0.1 - - [11/Feb/2025 18:10:54] "[32mPOST /login HTTP/1.1[0m" 302 -
127.0.0.1 - - [11/Feb/2025 18:10:54] "GET /home HTTP/1.1" 200 -
127.0.0.1 - - [11/Feb/2025 18:10:54] "[36mGET /home_page HTTP/1.1[0m" 304 -


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 111ms/step


127.0.0.1 - - [11/Feb/2025 18:11:05] "GET /videostream?inputSelect=Live%20Input HTTP/1.1" 200 -
127.0.0.1 - - [11/Feb/2025 18:11:05] "GET /update HTTP/1.1" 200 -


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
[

Exception in thread Thread-14 (capture_video):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/tl/hsc0jgns65l2qj3w8_vx7pnw0000gn/T/ipykernel_18789/3641206848.py", line 57, in capture_video
cv2.error: OpenCV(4.9.0) /Users/xperience/GHA-OpenCV-Python2/_work/opencv-python/opencv-python/opencv/modules/imgproc/src/resize.cpp:4152: error: (-215:Assertion failed) !ssize.empty() in function 'resize'

127.0.0.1 - - [11/Feb/2025 18:11:27] "[32mGET /release HTTP/1.1[0m" 302 -
127.0.0.1 - - [11/Feb/2025 18:11:27] "GET /home HTTP/1.1" 200 -
