## 유튜브 랭킹 크롤링

In [None]:
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from tqdm import tqdm
import time
import pandas as pd

# 웹 드라이버 설정 및 페이지 접속
options = webdriver.ChromeOptions()
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--no-sandbox")
driver = webdriver.Chrome(options=options)
driver.set_window_size(1200, 2000)

# 빈 리스트 초기화
collected_data = []

# 페이지 1부터 84까지 순회
for page_num in tqdm(range(1, 84), desc="Scraping pages"):
    # 페이지 URL 업데이트
    page_url = f"https://youtube-rank.com/board/bbs/board.php?bo_table=youtube&page={page_num}"
    driver.get(page_url)
    time.sleep(1)  # 웹 페이지 로드를 위해 대기

    # 맨 아래까지 스크롤
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    time.sleep(1)  # 페이지가 완전히 로드될 때까지 기다립니다

    # 각 페이지의 제목과 카테고리 추출
    for num in range(1, 101):  # 1부터 100까지
        try:
            titles_xpath = f'//*[@id="list-skin"]/form[1]/table/tbody/tr[{num}]/td[3]/h1/a'
            category_xpath = f'//*[@id="list-skin"]/form[1]/table/tbody/tr[{num}]/td[3]/h1/p'

            title_element = driver.find_element(By.XPATH, titles_xpath)
            category_element = driver.find_element(By.XPATH, category_xpath)

            title_text = title_element.text
            category_text = category_element.text

            # 수집된 데이터 저장
            collected_data.append({'title': title_text, 'category': category_text})

        except NoSuchElementException:
            # 요소가 페이지에 없는 경우
            continue

# 리스트를 DataFrame으로 변환
news_df = pd.DataFrame(collected_data)

# 웹 드라이버 종료
driver.quit()

# 결과 출력
print(news_df)


## 데이터 수집 크롤링

In [None]:
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from isodate import parse_duration
import requests
import csv
import os

# API 키 리스트
api_keys = [
    'AIzaSyCVUXop1pTF7x4sP1dIRJk9gVtn5d7MfKQ',
    'AIzaSyCwBiqWbx-_0kmU5bi20YgsmijbyhTgi_M',
    'AIzaSyDkQaL5xk6FYsTGHv8o5so-Aulmyf5tW3A'
]
key_index = 0  # 현재 사용 중인 API 키의 인덱스

# 현재 사용 중인 API 키 가져오기
def get_current_api_key():
    return api_keys[key_index]

# 다음 API 키로 전환
def switch_to_next_api_key():
    global key_index
    key_index = (key_index + 1) % len(api_keys)

# YouTube build 객체 생성
youtube = build('youtube', 'v3', developerKey=get_current_api_key())

# 'id3.csv'에서 채널명과 카테고리 리스트를 가져오기
with open('id3.csv', mode='r', encoding='utf-8-sig') as file:
    csv_reader = csv.DictReader(file)
    channels_info = [(row['title'], row['category']) for row in csv_reader]

# CSV 파일 존재 여부 확인 및 헤더 작성
csv_file_path = 'youtube_videos.csv'
file_exists = os.path.isfile(csv_file_path)

# ISO 8601 기간을 초로 변환하는 함수
def iso8601_duration_to_seconds(duration):
    return int(parse_duration(duration).total_seconds())

try:
    # 각 채널에 대해 정보 수집
    for channel_name, channel_category in channels_info:
        while True:
            try:
                # 채널 검색 및 채널 ID 추출
                search_response = youtube.search().list(
                    q=channel_name,
                    order='relevance',
                    part='snippet',
                    maxResults=1
                ).execute()

                channel_id = search_response['items'][0]['id']['channelId'] if search_response['items'] else None

                if channel_id:
                    # 채널 구독자 수 가져오기
                    api_key = get_current_api_key()
                    channel_url = f'https://www.googleapis.com/youtube/v3/channels?part=statistics&id={channel_id}&key={api_key}'
                    channel_response = requests.get(channel_url)
                    channel_data = channel_response.json()
                    subscriber_count = channel_data['items'][0]['statistics']['subscriberCount'] if 'items' in channel_data and channel_data['items'] else "Unknown"

                    # 모든 동영상 정보 가져오기
                    page_token = None
                    while True:
                        search_url = f'https://www.googleapis.com/youtube/v3/search?key={api_key}&channelId={channel_id}&part=snippet,id&order=date&maxResults=50&publishedAfter=2021-01-01T00:00:00Z{f"&pageToken={page_token}" if page_token else ""}'
                        search_response = requests.get(search_url)
                        search_data = search_response.json()

                        # 동영상 정보 추출 및 저장
                        with open(csv_file_path, 'a', newline='', encoding='utf-8') as file:
                            writer = csv.writer(file)

                            if not file_exists:
                                writer.writerow(['Category', 'Video Title', 'View Count', 'Subscriber Count', 'Upload Date', 'Thumbnail URL'])
                                file_exists = True

                            for video in search_data['items']:
                                if video['id']['kind'] == 'youtube#video':
                                    video_id = video['id']['videoId']
                                    video_title = video['snippet']['title']
                                    upload_date = video['snippet']['publishedAt']
                                    thumbnail_url = video['snippet']['thumbnails']['high']['url']

                                    # 조회수 및 비디오 기간 가져오기
                                    video_url = f'https://www.googleapis.com/youtube/v3/videos?part=contentDetails,statistics&id={video_id}&key={api_key}'
                                    video_response = requests.get(video_url)
                                    video_data = video_response.json()
                                    video_duration = video_data['items'][0]['contentDetails']['duration']
                                    video_duration_seconds = iso8601_duration_to_seconds(video_duration)
                                    view_count = video_data['items'][0]['statistics']['viewCount']

                                    # 60초 이상의 동영상만 저장
                                    if video_duration_seconds >= 60:
                                        writer.writerow([channel_category, video_title, view_count, subscriber_count, upload_date, thumbnail_url])

                        page_token = search_data.get('nextPageToken')
                        if not page_token:
                            break

                    print(f"CSV 파일 {channel_name} 저장 완료")
                    break
                else:
                    print(f"Error: Unable to retrieve channel ID for {channel_name}.")
                    break

            except KeyError:
                print(f"Channel ID not found for {channel_name}. Skipping to next channel.")
                break

            except HttpError as e:
                if e.resp.status == 403:
                    print("API 사용 제한이 감지되었습니다. 다른 API 키로 전환합니다.")
                    switch_to_next_api_key()
                    youtube = build('youtube', 'v3', developerKey=get_current_api_key())
                else:
                    print(f"YouTube API Error: {e}")
                    break

    print("Processing completed.")

except Exception as e:
    print(f"An error occurred: {e}")



## 이미지 다운로드

In [None]:
import os
import requests
import pandas as pd

# CSV 파일 경로
csv_file_path = 'file.csv'

# CSV 파일 불러오기
data = pd.read_csv(csv_file_path)

def download_image(image_url, folder_name, file_id):
    # 폴더가 없으면 생성
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
    
    # 파일 경로 생성 (폴더명_id.jpg)
    file_path = os.path.join(folder_name, f'{folder_name}_{file_id}.jpg')
    
    # 이미지 다운로드 및 파일에 저장
    response = requests.get(image_url)
    if response.status_code == 200:
        with open(file_path, 'wb') as file:
            file.write(response.content)

for index, row in data.iterrows():
    # 카테고리 번호와 클래스 접미사로 폴더명 생성
    category_prefix = f'{row["Category Number"]:02d}'
    class_suffix = '1'
    if row['class_a'] == 1:
        class_suffix = '1'
    elif row['class_b'] == 1:
        class_suffix = '2'
    elif row['class_c'] == 1:
        class_suffix = '3'
    elif row['class_d'] == 1:
        class_suffix = '4'
    folder_name = f'{category_prefix}{class_suffix}'  # 예: '011'
    
    # 특정 카테고리 번호 범위에 대해서만 이미지 다운로드
    if folder_name.startswith('01'):  # '01'로 시작하는 폴더에 대해서만 작업 실행
        # 이미지 URL
        image_url = row['Thumbnail URL']
        
        # 데이터프레임의 고유 식별자(ID)를 파일명에 사용
        file_id = row['ID']
        
        # 이미지 다운로드 함수 호출
        download_image(image_url, folder_name, file_id)

print("이미지 다운로드 완료")

## 이미지 리사이즈

In [None]:
import os
import cv2
import numpy as np
from matplotlib import pyplot as plt
import matplotlib.cm as cm
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
import pickle
from tqdm.notebook import tqdm
import splitfolders

folder_path = 'img'
label_names = os.listdir(folder_path)
label_names

os.mkdir('resize')
for label in label_names:
    dir_path = 'resize/'+ label
    os.mkdir(dir_path)


dataset = {} # 카테고리명 : [이미지1경로, 이미지2경로, ...]

for label in os.listdir(folder_path):
    sub_path = folder_path+'/'+label+'/'
    dataset[label] = []
    for filename in os.listdir(sub_path):
        dataset[label].append(sub_path+filename)

def resize_img(img_path, img_size=224):
    img = cv2.imread(img_path)

    if(img.shape[1] > img.shape[0]) :
        ratio = img_size/img.shape[1]
    else :
        ratio = img_size/img.shape[0]

    img = cv2.resize(img, dsize=(0, 0), fx=ratio, fy=ratio, interpolation=cv2.INTER_LINEAR)

    # 그림 주변에 검은색으로 칠하기
    w, h = img.shape[1], img.shape[0]

    dw = (img_size-w)/2 # img_size와 w의 차이
    dh = (img_size-h)/2 # img_size와 h의 차이

    M = np.float32([[1,0,dw], [0,1,dh]])  #(2*3 이차원 행렬)
    img_re = cv2.warpAffine(img, M, (224, 224)) #이동변환
    cv2.imwrite('resize/{0}/{1}'.format(label, img_path.split("/")[-1]) , img_re)

for label, img_paths in dataset.items():
    for img_path in img_paths:
        resize_img(img_path, img_size=224)



## 데이터 스플릿

In [None]:
splitfolders.ratio('resize', output='dataset', seed=77, ratio=(0.6, 0.2, 0.2))

## 데이터 분석 및 증강

In [None]:
def count_dataset_files(directory):
    category_counts = {}
    for root, dirs, files in os.walk(directory):
        if 'train' in root: 
            for dir in dirs:
                path = os.path.join(root, dir)
                num_files = len([name for name in os.listdir(path) if os.path.isfile(os.path.join(path, name))])
                category_counts[dir] = num_files
    return category_counts

dataset_directory = 'dataset' 
counts = count_dataset_files(dataset_directory)
print(counts)


In [None]:
import albumentations as A
from albumentations.core.composition import Compose
from skimage import io
from skimage.transform import resize
import os

# 데이터 증강 파이프라인 정의
augmentations = Compose([
    A.RandomBrightnessContrast(p=0.5),  # 밝기와 대비를 무작위로 조정
    A.Resize(224, 224)  # 224x224로 이미지 크기 조정
])

def augment_and_save(image_path, save_path, augmentations, num_augmented_images=10):
    image = io.imread(image_path)
    os.makedirs(save_path, exist_ok=True)
    
    # 원본 이미지 이름
    image_name = os.path.basename(image_path)
    
    for i in range(num_augmented_images):
        # 데이터 증강 적용
        augmented = augmentations(image=image)
        augmented_image = augmented["image"]
        
        # 증강된 이미지 저장
        augmented_image_path = os.path.join(save_path, f"aug_{i}_{image_name}")
        io.imsave(augmented_image_path, augmented_image)

def augment_images_in_directory(source_dir, save_dir_base, augmentations):
    for category in os.listdir(source_dir):
        category_dir = os.path.join(source_dir, category)
        if os.path.isdir(category_dir):
            save_dir = os.path.join(save_dir_base, category)
            for image_name in os.listdir(category_dir):
                image_path = os.path.join(category_dir, image_name)
                augment_and_save(image_path, save_dir, augmentations)

# 데이터 증강을 수행할 각 카테고리별 디렉토리의 기본 경로
source_dir = 'dataset/train'
save_dir_base = 'dataset/augmented'

# 모든 카테고리에 대해 데이터 증강 수행
augment_images_in_directory(source_dir, save_dir_base, augmentations)


## 이미지 만개로 통일

In [None]:
import os
import random

def trim_images_in_folder(folder, max_images=10000):
    files = [os.path.join(folder, f) for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))]
    
    if len(files) > max_images:
        #파일삭제
        num_files_to_remove = len(files) - max_images
        files_to_remove = random.sample(files, num_files_to_remove)
        
        # 선택된 파일 삭제
        for file in files_to_remove:
            os.remove(file)
            print(f"Removed: {file}")


augmented_folder = 'dataset/augmented'
for category_folder in os.listdir(augmented_folder):
    folder_path = os.path.join(augmented_folder, category_folder)
    if os.path.isdir(folder_path):
        print(f"Trimming images in {folder_path}...")
        trim_images_in_folder(folder_path, max_images=10000)


## 데이터 학습

In [None]:
from keras.models import Model
from keras.layers import Dense, GlobalAveragePooling2D, Dropout
from keras.regularizers import l2  # 여기에 l2를 임포트합니다
from tensorflow.keras.applications import ResNet50
from keras.callbacks import EarlyStopping
from keras.optimizers import Adam  # Adam 옵티마이저를 임포트합니다
from keras.preprocessing.image import ImageDataGenerator

# 데이터 증강을 위한 설정 (필요에 따라 조정)
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True
)

val_datagen = ImageDataGenerator(rescale=1./255)

for category in range(1, 17):  # 16개 카테고리
    # ResNet50 모델 초기화, Fully Connected Layer 제외
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

    # 모델 커스터마이징
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dropout(0.5)(x)  
    x = Dense(1024, activation='relu', kernel_regularizer=l2(0.001))(x)  
    predictions = Dense(4, activation='softmax', kernel_regularizer=l2(0.001))(x)  

    model = Model(inputs=base_model.input, outputs=predictions)

    # 모델 컴파일
    model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

    # 조기 종료 콜백 설정
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min')

    # 카테고리별 폴더를 직접 지정
    category_folders = [f'{category:02d}{score}' for score in range(1, 5)]

    # 훈련 및 검증 데이터 생성기 설정
    train_generator = train_datagen.flow_from_directory(
        'dataset/combined',
        target_size=(224, 224),
        batch_size=32,
        classes=category_folders,
        class_mode='categorical')

    val_generator = val_datagen.flow_from_directory(
        'dataset/val',
        target_size=(224, 224),
        batch_size=32,
        classes=category_folders,
        class_mode='categorical')

    # 모델 학습
    print(f"Training model for Category {category:02d}")
    model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=20,
        steps_per_epoch=train_generator.samples // 32,
        validation_steps=val_generator.samples // 32,
        callbacks=[early_stopping])  

    # 학습된 모델 저장
    model_save_path = f'saved_models/category_{category:02d}_new_model.h5'
    model.save(model_save_path)
    print(f"Trained model for Category {category:02d} saved at {model_save_path}")

## 결과 확인

In [None]:
import os
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import load_model

# 'img' 폴더 내의 이미지 파일 이름 가져오기
image_folder = 'img'
image_files = [f for f in os.listdir(image_folder) if os.path.isfile(os.path.join(image_folder, f))]

# 이미지 파일이 있을 경우 첫 번째 이미지 사용
if image_files:
    image_path = os.path.join(image_folder, image_files[0])  # 첫 번째 이미지의 경로
    input_image = load_img(image_path, target_size=(224, 224))  # 모델에 맞는 이미지 크기로 로드
    input_image = img_to_array(input_image)  # 이미지를 배열로 변환
    input_image = np.expand_dims(input_image, axis=0)  # 모델 예측을 위해 차원 추가
    input_image /= 255.0  # 이미지 정규화

    # 모든 모델의 예측 확률을 저장할 배열 초기화
    all_predictions = np.zeros((16, 4))  # 16개 모델, 각 모델은 4개 클래스에 대한 확률을 예측

    # 16개 모델에 대해 반복
    for i in range(1, 17):
        # 모델 로드
        model_path = f'saved_models/category_{i:02d}_new_model.h5'
        model = load_model(model_path)

        # 이미지에 대한 예측 수행
        pred = model.predict(input_image)

        # 예측 확률 저장
        all_predictions[i-1] = pred

        # 모델별 가장 높은 확률을 가진 클래스 출력 
        predicted_class = np.argmax(pred) + 1
        print(f"Model {i:02d} predicted class: {predicted_class}")

    # 모든 모델의 예측 확률의 평균 계산
    average_predictions = np.mean(all_predictions, axis=0)

    # 평균 확률이 가장 높은 클래스 선택
    final_prediction = np.argmax(average_predictions) + 1 

    if final_prediction == 1:
        print("썸네일 주목도가 '매우 높음'입니다")
    elif final_prediction == 2:
        print("썸네일 주목도가 '높음' 입니다")
    elif final_prediction == 3:
        print("썸네일 주목도가 '보통' 입니다")
    elif final_prediction == 4:
        print("썸네일 주목도가 '낮음' 입니다")
    else:
        print("에러")

else:
    print("Error")
