

---


###***Initialisation cells (Run these cells to download the necessary libraries and mount the drive)***


---



In [None]:
# code to mount my drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# install the necessary requirements

!pip install -r '/content/drive/My Drive/My_Software_Projects/BLIP/requirements.txt'
!pip install SpeechRecognition
!pip install moviepy

In [None]:
%cd /content/drive/MyDrive/My_Software_Projects/BLIP
!pip install -Uqq ipdb
import ipdb

In [None]:
%pdb on



---


###***Util codes that need to be run before executing the codes in the main section***


---



#### Code to load images and preprocess them

In [3]:
from PIL import Image
import requests
import torch
from torchvision import transforms
from torchvision.transforms.functional import InterpolationMode

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def loadImage(imageSize, device, rawImage):

    # imgUrl = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
    # rawImage = Image.open(requests.get(imgUrl, stream=True).raw).convert('RGB')
    # imgUrl = '/content/drive/My Drive/My_Software_Projects/Input_Frames/time0_frame1.jpg'
    # rawImage = Image.open(imgUrl).convert('RGB')
    # rawImage = cv2.imread(imgUrl)
    rawImage = rawImage
    rawImage = cv2.cvtColor(rawImage, cv2.COLOR_BGR2RGB)
    rawImage = Image.fromarray(rawImage)


    w,h = rawImage.size
    display(rawImage.resize((w//5,h//5)))
    #cv2.imshow('image', raw_image)

    transform = transforms.Compose([
        transforms.Resize((imageSize,imageSize),interpolation=InterpolationMode.BICUBIC),
        transforms.ToTensor(),
        transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711))
        ])
    image = transform(rawImage).unsqueeze(0).to(device)

    return image

#### Make directories utli function


In [5]:
import os

def makeDirectory(path):

    try:

        os.makedirs(path, exist_ok=True)
        print("Directory '%s' created successfully" % path)

    except OSError as error:

        print("Directory already exist")
        pass

#### Util function to write CSV File

In [8]:
import csv

def csvWriteRow(pathToCSVFile, rowData):

  """
  Util function to write a CSV File

  pathToCSVFile: path to the CSV File
  rowData: Array data to be written

  """

  # write the csv File
  with open(pathToCSVFile, 'a', newline='') as testWriteCSV:

    csvWriter = csv.writer(testWriteCSV)
    csvWriter.writerow(rowData)

#### Function to get the audio to Text from a video file

In [14]:
import speech_recognition as sr
import moviepy.editor as mp
from moviepy.editor import VideoFileClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip


def videoToText(pathToVideo):
  """
  Util Function that converts video to text using the Google Speech to text API

  :param pathToVideo : Path to the video file
  :return: List containing the audiototext per second
  """

  print ("Converting speech to Text from Video..")
  clip = VideoFileClip(pathToVideo)

  numSecondsVideo = int(clip.duration)
  print("The video is {} seconds".format(numSecondsVideo))
  lenList = list(range(0, numSecondsVideo + 1, 1))

  resultDict = {}
  for i in range(len(lenList) - 1):

    print("time = " + str(i))

    # clipping video into chunks of small sections to run through the speech APi
    ffmpeg_extract_subclip(pathToVideo, lenList[i] - 2 * (lenList[i] != 0), lenList[i + 1],
                           targetname="/content/drive/My Drive/My_Software_Projects/Intermediate_Folder/cut{}.mp4".format(i + 1))

    clip = mp.VideoFileClip(r"/content/drive/My Drive/My_Software_Projects/Intermediate_Folder/cut{}.mp4".format(i + 1))

    # separating the audio from video
    clip.audio.write_audiofile(r"/content/drive/My Drive/My_Software_Projects/Intermediate_Folder/converted{}.wav".format(i + 1))

    r = sr.Recognizer()
    audio = sr.AudioFile("/content/drive/My Drive/My_Software_Projects/Intermediate_Folder/converted{}.wav".format(i + 1))

    with audio as source:
      r.adjust_for_ambient_noise(source)
      audioFile = r.record(source)

    try:

      # feeding the sudio to the google speech to text API
      result = r.recognize_google(audioFile)

    except sr.exceptions.UnknownValueError:

      # store as no exception if the audio has not enough data to convert to text
      result = 'No transcript'
    resultDict['chunk{}'.format(i + 1)] = result

  listText = [resultDict['chunk{}'.format(i + 1)] for i in range(len(resultDict))]

  return listText





---


### ***Main code Section to run inference of BLIP on various modes and also dataset generation. (All the code cells above need to executed before the execution of the cells below)***


---


#### Code to run an inference on the BLIP image captioning network from a video file

In [None]:
from models.blip import blip_decoder
from PIL import Image
import requests
import torch
from torchvision import transforms
from torchvision.transforms.functional import InterpolationMode
import cv2
import time

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
currentTime = time.strftime("%H_%M_%S")

#path to input Video File
pathToInputVideoFile = '/content/drive/My Drive/My_Software_Projects/Input_Video/InputVideo2.mp4'

#path to the output Folder
pathToOutputFramesFolder = '/content/drive/My Drive/My_Software_Projects/Output/Output_Caption_' + str(currentTime) + '/'
makeDirectory(pathToOutputFramesFolder)

# path to the output text File
pathToOutputTxtFile = '/content/drive/My Drive/My_Software_Projects/Output_File/CaptionTextFile_' + str(currentTime) + '.txt'

# set image size
imageSize = 384

#Loading the model file
modelUrl = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_capfilt_large.pth'



model = blip_decoder(pretrained=modelUrl, image_size=imageSize, vit='base')
model.eval()
model = model.to(device)
print("Model Successfully Loaded.")

print("Loading Input Video File.....")
inputVideo = cv2.VideoCapture(pathToInputVideoFile)
print("Successfully Loaded input File")


# Calculate the Frames per second (FPS)
print("Calculating Frames Per Second...")
fps = round(inputVideo.get(cv2.CAP_PROP_FPS))
print('Fps = ' + str(fps))

frameNumber = 0
timeStamp = 0

print('Processing Frames...')


while True:
  # Processing Frames
  success, imageFrame = inputVideo.read()

  if success:
    # increase the frame by 1
    frameNumber += 1

    image = loadImage(imageSize=imageSize, device=device, rawImage=imageFrame)

    with torch.no_grad():

      # ipdb.set_trace(context=6)
      # beam search
      caption = model.generate(image, sample=False, num_beams=3, max_length=20, min_length=5)

      # nucleus sampling
      #caption = model.generate(image, sample=True, top_p=0.9, max_length=20, min_length=5)

      print('caption: '+caption[0])

      # outputFrameFilePath = pathToOutputFramesFolder + 'time' + str(timeStamp) + '_' + 'frame' + str(frameNumber) + '.jpg'

      outputFrameFilePath = pathToOutputFramesFolder + str(caption[0]) + '.jpg'

      # write the frame
      cv2.imwrite(outputFrameFilePath, imageFrame)

      # write the captions to an output file
      writingText = "TimeStamp = " + str(timeStamp) +  " Frame = " + str(frameNumber) + " caption : " + str(caption[0])

      with open(pathToOutputTxtFile, 'a') as testwritefile:
        testwritefile.write(writingText + '\n')

      print('Time = ' + str(timeStamp) + ' secs Frame = ' + str(frameNumber) + ' saved successfully')

  else:

    break

  # every time you hit the last frame increase the time by 1 and reset the frames to 0
  if frameNumber == fps:

    timeStamp += 1
    frameNumber = 0

print("Frame successfully Processed.")
inputVideo.release()

#### Inference code for VQA (Visual Question Answering)

In [None]:
from models.blip_vqa import blip_vqa
from models.blip import blip_decoder
from PIL import Image
import requests
import torch
from torchvision import transforms
from torchvision.transforms.functional import InterpolationMode
import cv2
import time

# question to be Asked (Text Prompt)
question = 'What is the facial expression of the person?'

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
currentTime = time.strftime("%H_%M_%S")

#path to input Video File
pathToInputVideoFile = '/content/drive/My Drive/My_Software_Projects/Input_Video/InputVideo2.mp4'

#path to the output Folder
pathToOutputFramesFolder = '/content/drive/My Drive/My_Software_Projects/Output/Output_VQA_' + str(currentTime) + '/'
makeDirectory(pathToOutputFramesFolder)

# path to the output text File
pathToOutputTxtFile = '/content/drive/My Drive/My_Software_Projects/Output_File/CaptionTextFile_' + str(currentTime) + '.txt'

imageSize = 480
# image = load_image(image_size=image_size, device=device)

modelUrl = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_vqa_capfilt_large.pth'

print("Loading the Pre-Trained Model...")

model = blip_vqa(pretrained=modelUrl, image_size=imageSize, vit='base')
model.eval()
model = model.to(device)
print("Model Successfully Loaded.")

print("Loading Input Video File.....")
inputVideo = cv2.VideoCapture(pathToInputVideoFile)
print("Successfully Loaded input File")


# Calculate the Frames per second (FPS)
print("Calculating Frames Per Second...")
fps = round(inputVideo.get(cv2.CAP_PROP_FPS))
print('Fps = ' + str(fps))

frameNumber = 0
timeStamp = 0

print('Processing Frames...')


while True:
  # Processing Frames
  success, imageFrame = inputVideo.read()

  if success:
    # increase the frame by 1
    frameNumber += 1

    image = loadImage(imageSize=imageSize, device=device, rawImage=imageFrame)

    with torch.no_grad():

      answer = model(image, question, train=False, inference='generate')
      print('answer: '+answer[0])


      outputFrameFilePath = pathToOutputFramesFolder + str(answer[0]) + str(timeStamp) + str(frameNumber) + '.jpg'

      # write the frame
      cv2.imwrite(outputFrameFilePath, imageFrame)

      # write the captions
      writingText = "TimeStamp = " + str(timeStamp) +  " Frame = " + str(frameNumber) +  " Emotion : " + str(answer[0])
      # utputTextFile.write(writingText)

      with open(pathToOutputTxtFile, 'a') as testwritefile:
        testwritefile.write(writingText + '\n')

      # outputTextFile.write('\n')

      print('Time = ' + str(timeStamp) + ' secs Frame = ' + str(frameNumber) + ' saved successfully')

  else:

    break

  # every time you hit the last frame increase the time by 1 and reset the frames to 0
  if frameNumber == fps:

    timeStamp += 1
    frameNumber = 0

print("Frame successfully Processed.")
inputVideo.release()



#### Code to generate Dataset containing Timestamp, Image/FrameId, FileName,  CaptionedText, QAResult, AudiotoText

In [None]:
from models.blip_vqa import blip_vqa
from models.blip import blip_decoder
from PIL import Image
import requests
import torch
from torchvision import transforms
from torchvision.transforms.functional import InterpolationMode
import cv2
import time
import csv

# question to be Asked (Text Prompt)
question = 'What is the facial expression of the person?'

# csv object to be written to the csv File
csvWrite = ['Timestamp', 'FrameID', 'FileName', 'CaptionedText', 'QAResult', 'AudioToText']

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
currentTime = time.strftime("%H_%M_%S")

#path to input Video File
pathToInputVideoFile = '/content/drive/My Drive/My_Software_Projects/Input_Video/InputVideo2.mp4'

#path to the output Folder
pathToOutputFramesFolder = '/content/drive/My Drive/My_Software_Projects/Output/Output_Dataset_' + str(currentTime) + '/'
makeDirectory(pathToOutputFramesFolder)

# path to the output text File
pathToOutputTxtFile = '/content/drive/My Drive/My_Software_Projects/Output_File/CaptionTextFile_' + str(currentTime) + '.txt'
pathToOutputCSVFile = '/content/drive/My Drive/My_Software_Projects/Output_File/CaptionCSVFile_' + str(currentTime) + '.csv'

csvWriteRow(pathToCSVFile=pathToOutputCSVFile, rowData=csvWrite)

imageSizeVQA = 480
imageSizeIC = 384
# image = load_image(image_size=image_size, device=device)


# Url to the model file
modelUrlVQA = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_vqa_capfilt_large.pth'
modelUrlIC = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_capfilt_large.pth'

# Video to text conversion
audioToTextList = videoToText(pathToVideo = pathToInputVideoFile)
print ("Conversion from speech to text successful.")


print("Loading the Pre-Trained Image captioning Model...")
modelIC = blip_decoder(pretrained=modelUrlIC, image_size=imageSizeIC, vit='base')
modelIC.eval()
modelIC = modelIC.to('cpu')
print("Image cpationing Model Successfully Loaded.")

print("Loading the Pre-Trained VQA Model...")
modelVQA = blip_vqa(pretrained=modelUrlVQA, image_size=imageSizeVQA, vit='base')
modelVQA.eval()
modelVQA = modelVQA.to('cuda')
print("VQA Model Successfully Loaded.")

print("Loading Input Video File.....")
inputVideo = cv2.VideoCapture(pathToInputVideoFile)
print("Successfully Loaded input File")


# Calculate the Frames per second (FPS)
print("Calculating Frames Per Second...")
fps = round(inputVideo.get(cv2.CAP_PROP_FPS))
print('Fps = ' + str(fps))

frameNumber = 0
timeStamp = 0

print('Processing Frames...')


while True:
  # Processing Frames
  success, imageFrame = inputVideo.read()

  if success:
    # increase the frame by 1
    frameNumber += 1

    imageVQA = loadImage(imageSize=imageSizeVQA, device='cuda', rawImage=imageFrame)
    imageIC = loadImage(imageSize=imageSizeIC, device='cpu', rawImage=imageFrame)

    with torch.no_grad():

      answer = modelVQA(imageVQA, question, train=False, inference='generate')
      caption = modelIC.generate(imageIC, sample=True, top_p=0.9, max_length=20, min_length=5)
      print('answer: '+answer[0])


      #outputFrameFilePath = pathToOutputFramesFolder + str(answer[0]) + str(timeStamp) + str(frameNumber) + '.jpg'
      outputFrameFileName = 'time_' + str(timeStamp) + 'frame_' + str(frameNumber) + '.jpg'
      outputFrameFilePath = pathToOutputFramesFolder + outputFrameFileName

      # write the frame
      cv2.imwrite(outputFrameFilePath, imageFrame)

      # write the captions
      # writingText = "TimeStamp = " + str(timeStamp) +  " Frame = " + str(frameNumber) + " Emotion : " + str(answer[0])
      # utputTextFile.write(writingText)

      # append a row to csvFile object
      csvWrite = [str(timeStamp), str(frameNumber), outputFrameFileName, str(caption[0]), str(answer[0]), str(audioToTextList[timeStamp])]
      csvWriteRow(pathToCSVFile=pathToOutputCSVFile, rowData=csvWrite)

      #with open(pathToOutputTxtFile, 'a') as testwritefile:
        #testwritefile.write(writingText + '\n')

      # outputTextFile.write('\n')

      print('Time = ' + str(timeStamp) + ' secs Frame = ' + str(frameNumber) + ' saved successfully')

  else:

    break

  # every time you hit the last frame increase the time by 1 and reset the frames to 0
  if frameNumber == fps:

    timeStamp += 1
    frameNumber = 0

print("Frame successfully Processed.")
inputVideo.release()

# write the csv File
#with open(pathToOutputCSVFile, 'w', newline='') as testWriteCSV:
#  csvWriter = csv.writer(testWriteCSV)
#  csvWriter.writerows(csvWrite)