# Comic Genre Classifier by Zach & Brooke

## Scraping Webtoons for URLs of Comic Episodes

In [None]:
import os
import pandas as pd
import httpx
import asyncio
import aiofiles
from bs4 import BeautifulSoup
import requests
import re
import nest_asyncio
import time
import random

nest_asyncio.apply()

semaphore = asyncio.Semaphore(100)

pd.set_option('display.width', 100)

In [None]:
def create_page_url_with_genres_dict():
    webtoons_comic_page_urls = []
    webtoons_comic_genre = []

    originals_url = "https://www.webtoons.com/en/originals"
    r = requests.get(originals_url)
    soup = BeautifulSoup(r.content, 'html.parser')

    completed_comics = soup.find('div', class_='daily_section on').find_all('a', href=True)

    webtoons_comic_page_urls += [a['href'] for a in completed_comics]
    webtoons_comic_genre += [comic.find('p').text for comic in completed_comics]

    ongoing_comics_days = soup.find_all('div', class_=re.compile('daily_section\s+_list_\w*'))
    for ongoing_comic_day in ongoing_comics_days:
        daily_ongoing_comics = ongoing_comic_day.find_all('li')

        webtoons_comic_page_urls += [li.find('a', href=True)['href'] for li in daily_ongoing_comics]
        webtoons_comic_genre += [comic.find('p').text for comic in daily_ongoing_comics]

    return dict(zip(webtoons_comic_page_urls, webtoons_comic_genre))

def create_episode_url_with_genres_dict(page_url_genre_dict):
    episode_url_genre_dict = {}
    # Scrape episode URLs for each comic page URL
    for comic_page_url, genre in page_url_genre_dict.items():
        r = requests.get(comic_page_url)
        soup = BeautifulSoup(r.content, 'html.parser')
        for li in soup.find_all('li', class_='_episodeItem'):
            episode_url = li.find('a', href=True)['href']
            episode_url_genre_dict[episode_url] = genre
    return episode_url_genre_dict

page_url_genre_dict = create_page_url_with_genres_dict()
episode_url_genre_dict = create_episode_url_with_genres_dict(page_url_genre_dict)

In [None]:
# Downloading JPG files of comic page images to disk sorted into directories based on genre

import random
async def download_image(url, directory, filename, semaphore):
    querystring = {"type": "q90"}
    headers = {"referer": "https://www.webtoons.com/"}

    async with httpx.AsyncClient() as client:
        async with semaphore:
            response = await client.get(url, headers=headers)
            if response.status_code == 200:
                async with aiofiles.open(os.path.join(directory, filename), "wb") as f:
                    await f.write(response.content)

async def main():
    start_time = time.time()
    image_directory = 'images'
    if not os.path.exists(image_directory):
        os.makedirs(image_directory)
    # Download images for each episode URL asynchronously
    tasks = []
    counter = 0
    for episode_url, genre in list(episode_url_genre_dict.items())[0:len(episode_url_genre_dict):2]:
        genre_directory = f"{image_directory}/{genre}"
        if not os.path.exists(genre_directory):
            os.makedirs(genre_directory)
        async with httpx.AsyncClient() as client:
            r = await client.get(episode_url)
        soup = BeautifulSoup(r.content, 'html.parser')
        for img in list(soup.find_all('img', class_='_images'))[0:100:8]:
            img_url = re.sub("\?type=\w*", "", img['data-url'])
            filename = img_url.split('/')[-1]
            tasks.append(download_image(img_url, genre_directory, filename, semaphore))  # Pass semaphore here
        counter += 1
        print(f"finished {episode_url} {counter} {time.time() - start_time}")
    print(time.time() - start_time)
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

## Loading Images From the Images Directory For OCR & Writing Features and Labels to Disk After Formatting

In [None]:
import os
from PIL import Image
import pytesseract
import numpy as np
import matplotlib.pyplot as plt
import time

pytesseract.pytesseract.tesseract_cmd = r'/opt/local/bin/tesseract'

img_data = []
text_data = []
genre_data = []

start_time = time.time()

Path = 'images'
for genre in os.listdir(Path):
    genre_start_time = time.time()
    if genre == '.DS_Store':
        continue
    path = os.path.join(Path, genre)
    for images in os.listdir(path):
        if images == '.DS_Store':
            continue
        path1 = os.path.join(path,images)
        image = Image.open(path1)
        text = pytesseract.image_to_string(image).strip()
        if text:
            text_data.append(text)
            genre_data.append(genre)
            image = image.resize((100,125))
            image = image.convert('RGB')
            img_data.append(np.array(image))
    print(f"finished {genre} in {time.time()-genre_start_time}")

import pickle

# write list to binary file
def write_list(filename, input_list):
    # store list in binary file so 'wb' mode
    with open(filename, 'wb') as fp:
        pickle.dump(input_list, fp)

np.save('img_data_new', np.array(img_data).astype('int8'))
write_list('text_data_new.bin', text_data)
write_list('genre_data_new.bin', genre_data)

print(f"finished everything in {time.time()-start_time}")

### Note: Uploaded files written to disk above into Google Drive so that we could work with them on Google Colab

## Preprocessing & Training Neural Network

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from transformers import DistilBertModel, DistilBertTokenizer
import numpy as np
import time
import os
import pickle
from PIL import Image
import torchvision.transforms as transforms
import torchvision.models as models
from sklearn.preprocessing import LabelEncoder
from google.colab import drive
from google.colab import userdata
userdata.get('HuggingFace')

def create_data_folder(user): # Different path to folder depending on user
    if user.lower() == 'brooke':
        return '/content/drive/MyDrive/Final Project'
    if user.lower() == 'zach':
        return '/content/drive/MyDrive/Year 2/Spring 2024/DS340/FinalProject'
    return '/content/drive/MyDrive/Final Project'

# Define the path to the folder containing the data
drive.mount('/content/drive')
data_folder = create_data_folder('zach') #change to 'brooke' when you use it

# neural network classifier
class GenreClassifier(nn.Module):
    def __init__(self, text_input_size, image_input_size, hidden_size, num_classes):
        super(GenreClassifier, self).__init__()
        self.text_fc1 = nn.Linear(text_input_size, hidden_size)
        self.image_fc1 = nn.Linear(image_input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, text_x, image_x):
        text_out = self.relu(self.text_fc1(text_x))
        image_out = self.relu(self.image_fc1(image_x))
        combined = torch.cat((text_out, image_out), dim=1)
        out = self.fc2(combined)
        return out

# DistilBERT model
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
model = DistilBertModel.from_pretrained('distilbert-base-uncased')

# NN layer that does nothing (used to replace output layer of ResNet50)
class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()

    def forward(self, x):
        return x

# Load ResNet50 model
resnet = models.resnet50(pretrained=True)
resnet.eval()
for param in resnet.parameters():
    param.requires_grad = False
resnet.fc = Identity()

# preprocessing for text
def get_tokens(text):
    return tokenizer(text, return_tensors="pt", padding=True, truncation=True)

# preprocessing for images
preprocess_image = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# extract image features using ResNet50
def extract_image_features_resnet(img_arr):
    image = Image.fromarray(np.uint8(img_arr))
    image = preprocess_image(image).unsqueeze(0) #shape(1,3,224,224)
    with torch.no_grad():
        features = resnet(image) # shape (1,2048)
    features = torch.nn.functional.relu(features)  # ReLU activation ; shape (1,2048)
    #features = torch.nn.functional.adaptive_avg_pool2d(features, (1, 1))  # Global average pooling
    features = features.squeeze() # shape (2048)
    return features

# helper function for load_data() for reading binary files
def read_bin(filename):
    with open(filename, 'rb') as fp:
        n_list = pickle.load(fp)
        return n_list

# load data
def load_data():
    img_data = np.load('/content/drive/MyDrive/Year 2/Spring 2024/DS340/FinalProject/img_data2.npy', allow_pickle=True)
    text_data = read_bin('/content/drive/MyDrive/Year 2/Spring 2024/DS340/FinalProject/text_data.bin')
    genre_data = read_bin('/content/drive/MyDrive/Year 2/Spring 2024/DS340/FinalProject/genre_data.bin')
    return img_data, text_data, genre_data

# combine features and labels
# def combine_features_and_labels(img_data, text_data):
#     since = time.time()
#     text_features_list = []
#     image_features_list = []
#     counter = 0
#     for img in img_data:
#         image_features = extract_image_features_resnet(img).detach().numpy()
#         image_features_list.append(image_features)
#         if counter%100 == 0:
#             print(f"{counter/len(img_data)*100}% done with images")
#         counter += 1
#     print(f"images finished: {time.time()-since}")
#     since = time.time()
#     counter = 0
#     for text in text_data:
#         tokens = get_tokens(text)
#         outputs = model(**tokens)
#         text_features = outputs.last_hidden_state.mean(dim=1).squeeze().detach().numpy()
#         text_features_list.append(text_features)
#         if counter%100 == 0:
#             print(f"{counter/len(text_data)*100}% done with text")
#         counter += 1
#     print(f"text finished: {time.time()-since}")

#     text_features_array = np.array(text_features_list)
#     image_features_array = np.array(image_features_list)

#     print(f"{text_features_array.nbytes} bytes in text_array")
#     print(f"{image_features_array.nbytes} bytes in image_array")

#     # flatten
#     text_features_flat = text_features_array.reshape(-1)
#     image_features_flat = image_features_array.reshape(-1)

#     # concatenate
#     combined_features = np.concatenate((text_features_flat, image_features_flat), axis=0)

#     return combined_features

def combine_features_and_labels(img_data, text_data, batch_size):
    since = time.time()
    combined_features_list = []
    for i in range(0, len(img_data), batch_size):
        img_batch = img_data[i:i+batch_size]
        text_batch = text_data[i:i+batch_size]

        image_features_batch = []
        text_features_batch = []
        for img in img_batch:
            image_features = extract_image_features_resnet(img).detach().numpy()
            image_features_batch.append(image_features)

        for text in text_batch:
            tokens = get_tokens(text)
            outputs = model(**tokens)
            text_features = outputs.last_hidden_state.mean(dim=1).squeeze().detach().numpy()
            text_features_batch.append(text_features)

        mean_image_features = np.mean(image_features_batch, axis=0)
        mean_text_features = np.mean(text_features_batch, axis=0)

        combined_features = np.concatenate((mean_text_features, mean_image_features))
        combined_features_list.append(combined_features)

        print(f"{i / len(img_data) * 100}% done with images and text")

    combined_features_array = np.array(combined_features_list)

    print(f"{combined_features_array.nbytes} bytes in combined features array")

    return combined_features_array

# Load and combine data
batch_size = 4
img_data, text_data, genre_data = load_data()
img_data = img_data
text_data = text_data
labels = genre_data[::batch_size]
label_encodings = dict(zip(np.unique(labels), np.arange(len(np.unique(labels)))))
labels = [*map(label_encodings.get, labels)]
features = combine_features_and_labels(img_data, text_data, batch_size)

# Split data into train/test sets
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

print(X_train)
print(X_train.shape)
print(y_train)

# Convert data to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# params
input_size_text = X_train.shape[1] // 2  # Assuming text and image features have the same size
input_size_image = X_train.shape[1] // 2
hidden_size = 128
num_classes = len(np.unique(labels))
num_epochs = 10
batch_size = 64
learning_rate = 0.001
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
classifier = GenreClassifier(input_size_text, input_size_image, hidden_size, num_classes)

# loss function
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.parameters(), lr=learning_rate)

# training
start_time = time.time()
for epoch in range(num_epochs):
    total_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        text_inputs = inputs[:, :input_size_text]  # Extract text features
        image_inputs = inputs[:, input_size_text:]  # Extract image features
        outputs = classifier(text_inputs, image_inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}")

print(f"Training finished in {time.time() - start_time} seconds.")

# evaluation
# evaluation
with torch.no_grad():
    outputs = classifier(X_test_tensor[:, :input_size_text], X_test_tensor[:, input_size_text:])
    _, predicted = torch.max(outputs, 1)
    test_accuracy = (predicted == y_test_tensor).sum().item() / len(y_test_tensor)
    print(f"Test Accuracy: {test_accuracy:.4f}")