#### BERT

1. Load the tokenizer and pre-trained model.

2. Load and preprocess the dataset.

3. Randomly split the dataset into training and evaluation sets.

4. Define training arguments, train the model, and save it.

5. Classify questions using the trained model.

6. Load the model and classify more questions.



In [None]:
# Import necessary libraries
import torch
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np
from datasets import Dataset

# Load the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Load the pre-trained BERT model with 2 labels (logistics and course-specific)
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

# Load the dataset
df = pd.read_csv('/content/drive/MyDrive/Final Project/backup - ques_cat.csv')

# Label encoding (converting string labels to integers)
label_encoder = LabelEncoder()
df['label'] = label_encoder.fit_transform(df['label'])

# Check label mapping
label_mapping = dict(zip(label_encoder.classes_, range(len(label_encoder.classes_))))
print(label_mapping)

# Define the preprocessing function to tokenize the text
def preprocess_function(examples):
    return tokenizer(examples["question"], padding="max_length", truncation=True)

# Convert the DataFrame into a Hugging Face Dataset object
dataset = Dataset.from_pandas(df)

# Apply the preprocessing function to the dataset
encoded_dataset = dataset.map(preprocess_function, batched=True)

# Randomly split the dataset into train and evaluation sets
train_test_split = encoded_dataset.train_test_split(test_size=0.2)
train_dataset = train_test_split['train']
eval_dataset = train_test_split['test']

# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir="./logs",
    evaluation_strategy="epoch",
)

# Initialize the Trainer object for training and evaluation
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
)

# Train the model
trainer.train()

# Save the trained model's state dictionary
model_save_path = '/content/drive/MyDrive/Final Project/bert_model.bin'
torch.save(trainer.model.state_dict(), model_save_path)

In [None]:
def bert_mod(user_question):
    # Load BERT Tokenizer
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    # Load BERT Model
    model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
    model.load_state_dict(torch.load(bert_model_path , map_location='cpu'))
    model.eval()
    category = classify_question(user_question, model, tokenizer)
    return category

# Function to classify new questions
def classify_question(question, model, tokenizer, device='cpu'):
    model.to(device)
    # Tokenize the question
    inputs = tokenizer(question, return_tensors="pt", padding="max_length", truncation=True)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Perform inference
    with torch.no_grad():
        outputs = model(**inputs)

    # Get probabilities using softmax and find the category with the highest probability
    predictions = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu()
    category = np.argmax(predictions.numpy())

    return category

#### Video Transcription: OpenAI Whisper

In [None]:
from googleapiclient.discovery import build
import pandas as pd
import json
import whisper
from pytube import YouTube
from moviepy.editor import *
import os
from PIL import Image

from google.colab import drive
drive.mount('/content/drive')

##### Create Dataset of Video URLs.

In [None]:
# Replace the following with your API key and playlist ID
api_key = 'AIzaSyBHGcRxLtGi0Gb6QYlYVB7aRujxG2A3uxg'
playlist_id = 'PLLssT5z_DsK9JDLcT8T62VtzwyW9LNepV'

youtube = build('youtube', 'v3', developerKey=api_key)

def convert_seconds_to_timestamp(seconds):
    seconds = float(seconds)
    hh = int(seconds // 3600)
    mm = int((seconds % 3600) // 60)
    ss = int(seconds % 60)
    timestamp = f"{hh:02}:{mm:02}:{ss:02}.000"

    return timestamp

def get_playlist_video_details(playlist_id):
    video_details = []
    next_page_token = None

    while True:
        pl_request = youtube.playlistItems().list(
            part='contentDetails',
            playlistId=playlist_id,
            # maxResults=50,  # Adjust as needed, max is 50
            pageToken=next_page_token
        )

        pl_response = pl_request.execute()

        video_ids = [item['contentDetails']['videoId'] for item in pl_response['items']]

        # If you have a large number of videos, consider splitting this into multiple requests to avoid going over quota
        videos_request = youtube.videos().list(
            part="snippet",
            id=','.join(video_ids)
        )

        videos_response = videos_request.execute()

        for item in videos_response['items']:
            video_details.append({
                'title': item['snippet']['title'],
                'url': f"https://www.youtube.com/watch?v={item['id']}"
            })

        next_page_token = pl_response.get('nextPageToken')

        if not next_page_token:
            break

    return video_details

# Fetch video details
video_details = get_playlist_video_details(playlist_id)

# Save to CSV
df = pd.DataFrame(video_details)
csv_filename = 'lecture_video_urls.csv'
df.to_csv(csv_filename, index=False)

print(f"Saved playlist data to {csv_filename}")


##### Download each video, get transcript using Whisper model, and save the timestamps. Iterate over each timestamp, and capture the sreenshots from the video. SO when the timestamps are returned, we also fetch the screenshot for visual reference.

In [None]:
# Download YouTube video
def download_video(url, path='videos/'):
    yt = YouTube(url)
    ys = yt.streams.get_highest_resolution()
    print("Downloading...", yt.title)
    if not os.path.exists(path):
        os.makedirs(path)
    filename = ys.download(output_path=path)
    print("Download completed\n")
    return yt.title, filename

# Get video transcript with timestamps
def get_transcript(video_path, model, output_path='transcripts/'):
    if not os.path.exists(output_path):
        os.makedirs(output_path)

    print("Transcribing video..")

    transcription = model.transcribe(video_path)

    data = dict()

    for segment in transcription['segments']:
        data[segment['id']] = {'start':segment['start'], 'end':segment['end'], 'text':segment['text']}
    with open(output_path+video_path[video_path.rindex("/")+1:-4]+'_transcript.json', 'w') as json_file:
        json.dump(data, json_file, indent=4)
    print("Transcription saved\n")

    return transcription

# Take screenshots at given timestamps
def capture_screenshots(video_path, timestamps, output_path='screenshots/'):
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    print("Taking screenshots..")
    clip = VideoFileClip(video_path)
    for timestamp in timestamps:
        imgpath = os.path.join(output_path, f"{video_path[video_path.rindex("/")+1:-4]}_screenshot_{timestamp}.png")
        clip.save_frame(imgpath, t=timestamp)
    print("Screenshots saved\n")


videos = pd.read_csv('lecture_video_urls.csv')['url']
transcription_dict = dict()

for video_url in videos:
    title, video_path = download_video(video_url)
    model = whisper.load_model("base.en")
    transcription = get_transcript(video_path, model)
    transcription_dict[title] = transcription['text']
    timestamps = [seg['start'] for seg in transcription['segments']]
    try:
        capture_screenshots(video_path, timestamps)
    except:
        print("Error occurred")

transcription_df = pd.DataFrame(list(transcription_dict.items()), columns=['Topic', 'Answer'])
transcription_df.to_csv('transcriptions.csv', index=False)

##### Video Screenshots every 1 Second.

In [None]:
def download_video(url, path='videos/'):
    yt = YouTube(url)
    ys = yt.streams.get_highest_resolution()
    print("Downloading...", yt.title)
    if not os.path.exists(path):
        os.makedirs(path)
    filename = ys.download(output_path=path)
    print("Download completed\n")
    return yt.title, filename

def capture_screenshots(video_path, output_path='screenshots_one_second/'):
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    print("Taking screenshots..")

    clip = VideoFileClip(video_path)
    duration = clip.duration
    for timestamp in range(int(duration)):
        imgpath = os.path.join(output_path, f"{os.path.basename(video_path)[:-4]}_screenshot_{timestamp}.png")
        clip.save_frame(imgpath, t=timestamp)

    clip.close()

    print("Screenshots saved\n")


videos = pd.read_csv('lecture_video_urls.csv')['url'][89:]
for video_url in videos:
    title, video_path = download_video(video_url)
    try:
        capture_screenshots(video_path)
    except:
        print("Error occurred")

##### Compare each screenshot.

In [None]:
ss_dir = "screenshots_timestamps"
ss_files = os.listdir(ss_dir)
l1_ss = [os.path.join(ss_dir,file) for file in ss_files if "Lecture 1 — Distributed File Systems" in file]
print("Number of screenshots captured at transcript timestamps:",len(l1_ss))
print("Eg:",l1_ss[0])

l1_ss_dict = dict()
for ss in l1_ss:
    l1_ss_dict[float(ss[ss.find('screenshot_')+11:-4])] = ss
l1_ss_dict = dict(sorted(l1_ss_dict.items()))

ss1_dir = "screenshots_one_second"
ss1_files = os.listdir(ss1_dir)
l1_ss1 = [os.path.join(ss1_dir,file) for file in ss1_files if "Lecture 1 — Distributed File Systems" in file]
print("Number of screenshots captured at every second:",len(l1_ss1))
print("Eg:",l1_ss1[0])

l1_ss1_dict = dict()
for ss in l1_ss1:
    l1_ss1_dict[float(ss[ss.find('screenshot_')+11:-4])] = ss
l1_ss1_dict = dict(sorted(l1_ss1_dict.items()))

for time, ss in l1_ss_dict.items():
    img_timestamp = Image.open(ss)
    print(ss)
    closest_second = int(time)
    img_one_second = Image.open(l1_ss1_dict[closest_second])
    print(l1_ss1_dict[closest_second])

    pixels_timestamp = list(img_timestamp.getdata())
    pixels_one_second = list(img_one_second.getdata())

    print(pixels_timestamp == pixels_one_second)

    break

##### Compare pixels and remove redundant images.

In [None]:
retained_ss = dict()
times = l1_ss1_dict.keys()
time1 = 0
while time1 < len(times)-1:
    i = time1
    ss1 = l1_ss1_dict[time1]
    img1 = Image.open(ss1)
    pixels1 = list(img1.getdata())

    retained_ss[time1] = ss1

    for j in range(i+1, len(times)):
        time2 = j
        ss2 = l1_ss1_dict[time2]
        img2 = Image.open(ss2)
        pixels2 = list(img2.getdata())
        print("i",i,"j",j)

        differences = [tuple(abs(c1 - c2) for c1, c2 in zip(pixel1, pixel2))
                   for pixel1, pixel2 in zip(pixels1, pixels2)]

        avg_difference = tuple(sum(c) / len(differences) for c in zip(*differences))
        if any(value > 50 for value in avg_difference):
            print("different")
            time1 = time2
            break
        elif time2==len(times)-1:
            time1 = time1+1
            break
        else:
            print("same")
            continue