In [None]:

#Body detection: model is trained as me, myself character so u can get along with the body files,capturing,training procedures... in vision99 code .  
#Lip detection: 
#   character base videos and aligns link https://spandh.dcs.shef.ac.uk//gridcorpus/  u can choose and download data from this website
#   download modelweight-checkpoint file of lip detection and get them as :

output = 'checkpoints.zip'
gdown.extractall('checkpoints.zip', 'models')

In [3]:
#save as py to import from it in streamlit code
%%writefile vision-loadmodelweight.py

import os 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten

def load_model() -> Sequential: 
    model = Sequential()

    model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPool3D((1,2,2)))

    model.add(Conv3D(256, 3, padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPool3D((1,2,2)))

    model.add(Conv3D(75, 3, padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPool3D((1,2,2)))

    model.add(TimeDistributed(Flatten()))

    model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
    model.add(Dropout(.5))

    model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
    model.add(Dropout(.5))

    model.add(Dense(41, kernel_initializer='he_normal', activation='softmax'))

    model.load_weights(os.path.join('models','checkpoint'))

    return model

Writing vision-loadmodelweight.py


In [4]:
#save as py to import from it in streamlit code
%%writefile vision-loadfunctions.py

import tensorflow as tf
from typing import List
import cv2
import os 

vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
# Mapping integers back to original characters
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

def load_video(path:str) -> List[float]: 
    #print(path)
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): 
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236,80:220,:])
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std
    
def load_alignments(path:str) -> List[str]: 
    #print(path)
    with open(path, 'r') as f: 
        lines = f.readlines() 
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil': 
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

def load_data(path: str): 
    path = bytes.decode(path.numpy())
    #file_name = path.split('/')[-1].split('.')[0]
    # File name splitting for windows
    file_name = path.split('\\')[-1].split('.')[0]
    video_path = os.path.join('data','s1',f'{file_name}.mpg')
    alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
    frames = load_video(video_path) 
    alignments = load_alignments(alignment_path)
    
    return frames, alignments

Writing vision-loadfunctions.py


In [1]:
%%writefile vision9c.py
import subprocess
from subprocess import run
import time
from typing import List
import gdown
import streamlit as st
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import mediapipe as mp
import tensorflow as tf
from PIL import Image
import os 
import imageio 
from vision-loadfunctions import load_data, num_to_char
from vision-loadmodelweight import load_model

# Initialize MediaPipe holistic
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

def convert_video_to_compatible_format(input_video_path):
    output_video_path = os.path.splitext(input_video_path)[0] + '_compatible.mp4'
    command = [
        'ffmpeg',
        '-i', input_video_path,
        '-c:v', 'libx264',
        '-preset', 'fast',
        '-pix_fmt', 'yuv420p',
        output_video_path,
        '-y'
    ]
    try:
        run(command, check=True)
        return output_video_path
    except subprocess.CalledProcessError as e:
        st.error(f"FFmpeg error: {e}")
        return None

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION) 
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) 
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) 
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) 
def draw_styled_landmarks(image, results):    
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             )     
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             )     
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

model = tf.keras.models.load_model('body.h5')
# Actions that we try to detect
actions = np.array(['neutral', 'shifting', 'pausing'])

# Process video and apply action recognition
def process_video(video_file):
    # Convert the uploaded video to a compatible format
    converted_video_path = convert_video_to_compatible_format(video_file.name)
    if not converted_video_path:
        st.error("Video conversion failed. Please check the video format.")
        st.stop()

    cap = cv2.VideoCapture(converted_video_path)
    sequence = []
    sentence = []
    predictions = []
    threshold = 0.5
    colors = [(245,117,16), (117,245,16), (16,117,245)]
    processed_frames_with_labels = []

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # Make detections
            image, results = mediapipe_detection(frame, holistic)
            
            # Draw landmarks
            draw_styled_landmarks(image, results)
            
            # Prediction logic
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]  # Keep last 30 frames
            
            if len(sequence) == 30:
                res = model.predict(np.expand_dims(sequence, axis=0))[0]
                predictions.append(np.argmax(res))
                
                # Visualization logic
                if np.unique(predictions[-10:])[0] == np.argmax(res): 
                    if res[np.argmax(res)] > threshold: 
                        if len(sentence) > 0: 
                            if actions[np.argmax(res)] != sentence[-1]:
                                sentence.append(actions[np.argmax(res)])
                        else:
                            sentence.append(actions[np.argmax(res)])

                if len(sentence) > 5: 
                    sentence = sentence[-5:]

                # Viz probabilities and add action labels to the frame
                image_with_labels = prob_viz(res, actions, image, colors)
                processed_frames_with_labels.append(image_with_labels)

        cap.release()

    # Check if the list is not empty before accessing
    if processed_frames_with_labels:
        height, width, layers = processed_frames_with_labels[0].shape
        size = (width, height)
        
        # Define the codec and create VideoWriter object
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        processed_video_path = 'processed_video_with_actions.mp4'
        out = cv2.VideoWriter(processed_video_path, fourcc, 15, size)
        
        for frame in processed_frames_with_labels:
            out.write(frame)
        out.release()
        
        # Convert the processed video to a compatible format using ffmpeg
        final_video_path = 'final_video_with_actions.mp4'
        run(['ffmpeg', '-i', processed_video_path, '-vcodec', 'libx264', final_video_path, '-y'])
        
        # Remove the temporary processed video file
        os.remove(processed_video_path)
        
        return final_video_path
    else:
        st.error("No frames were processed. Please check the video file and try again.")
        return None
#############################################################################################################################################        
st.set_page_config(layout='wide')

with st.sidebar: 
    
    st.title('ROBOTIC-VISION')
    st.info('This application is originally for developing smart analyses systems.')

# Initialize session states for each tab if not already present
if 'tab1_state' not in st.session_state:
    st.session_state['tab1_state'] = None

if 'tab2_state' not in st.session_state:
    st.session_state['tab2_state'] = None

# Define the sidebar options
sidebar_options = ["Lip detection", "Body detection"]
selected_option = st.sidebar.selectbox("Choose an option:", sidebar_options)

# Define a sidebar button for capture
if st.sidebar.button("Live Body Detection"):
    st.session_state['Live Body Detection'] = True
else:
    st.session_state['Live Body Detection'] = False

if selected_option == "Lip detection":
    st.session_state['tab1_state'] = "active"
    st.session_state['tab2_state'] = None
    st.title('Lip Reading and Analysis')  
    options = os.listdir(os.path.join('data', 's1'))
    selected_video = st.selectbox('Choose video for lip reading', options, key='video_selection_tab1')
    if options:
        st.info('The video below displays the converted video in mp4 format for lip reading')
        file_path = os.path.join('data', 's1', selected_video)
        converted_video_path = convert_video_to_compatible_format(file_path)
        if converted_video_path:
            video_file = open(converted_video_path, 'rb')
            video_bytes = video_file.read()
            st.video(video_bytes)
            video_file.close()  # Close the file after reading

        st.info('This is all the machine learning model sees when making a prediction for lip reading')
        video, annotations = load_data(tf.convert_to_tensor(file_path))
        
        st.image('animation.gif', width=400) 
        st.info('This is the output of the machine learning model as tokens for lip reading')
        model = load_model()
        yhat = model.predict(tf.expand_dims(video, axis=0))
        decoder = tf.keras.backend.ctc_decode(yhat, [75], greedy=True)[0][0].numpy()
        st.text(decoder)
        # Convert prediction to text
        st.info('Decode the raw tokens into words for lip reading')
        converted_prediction = tf.strings.reduce_join(num_to_char(decoder)).numpy().decode('utf-8')
        st.text(converted_prediction)

elif selected_option == "Body detection":
    st.session_state['tab1_state'] = None
    st.session_state['tab2_state'] = "active"
    st.info('The video below displays the processed video with landmarks and actions')
    uploaded_video = st.file_uploader("Upload a video", type=["mp4", "mpg", "mpeg..."])
    if uploaded_video is not None:
        # Save the uploaded video to a temporary file
        with open(uploaded_video.name, "wb") as f:
            f.write(uploaded_video.getbuffer())
        
        # Process video and display processed video with landmarks and actions
        processed_video_path = process_video(uploaded_video)
        if processed_video_path:
            st.video(processed_video_path)

        # Clean up temporary files
        if os.path.exists(uploaded_video.name):
            os.remove(uploaded_video.name)
        if processed_video_path and os.path.exists(processed_video_path):
            os.remove(processed_video_path)
            
if st.session_state['Live Body Detection']:
    def ld_model(model_path):
        model = tf.keras.models.load_model(model_path)
        return model
    
    # Later, when you want to use the model
    model_path = 'body.h5'  # Specify the correct path to your model weights
    loaded_model = ld_model(model_path)
    # Actions that we try to detect
    actions = np.array(['neutral', 'shifting', 'pausing'])
    colors = [(245,117,16), (117,245,16), (16,117,245)]
    sequence = []
    sentence = []
    predictions = []
    threshold = 0.5  
    cap = cv2.VideoCapture(0)
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
    
            # Read feed
            ret, frame = cap.read()
    
            # Make detections
            image, results = mediapipe_detection(frame, holistic)
            
            # Draw landmarks
            draw_styled_landmarks(image, results)
    
            # 2. Prediction logic
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]
            
            if len(sequence) == 30:
                res = loaded_model.predict(np.expand_dims(sequence, axis=0))[0]
                predictions.append(np.argmax(res))
    
                # 3. Viz logic
                if len(predictions) >= 10 and np.unique(predictions[-10:])[0] == np.argmax(res): 
                    if res[np.argmax(res)] > threshold: 
                        
                        action = actions[np.argmax(res)]
                        if action == 'pausing':
                            display_action = 'off'
                        elif action == 'shifting':
                            display_action = 'on'
                        else:
                            display_action = action
    
                        if len(sentence) > 0: 
                            if display_action != sentence[-1]:
                                sentence.append(display_action)
                        else:
                            sentence.append(display_action)
        
                    if len(sentence) > 5: 
                        sentence = sentence[-5:]
        
                    # Viz probabilities
                    image = prob_viz(res, actions, image, colors)
                    
                cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
                cv2.putText(image, ' '.join(sentence), (3,30), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
                # Show to screen
                cv2.imshow('OpenCV Feed', image)
    
            # Break gracefully
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
        cap.release()
        cv2.destroyAllWindows()
    

Overwriting vision9c.py


In [2]:
! streamlit run vision9c.py

^C
