In [1]:
import pandas as pd
import torch
import re
from transformers import AutoTokenizer, AutoModel

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# 전처리된 데이터 불러오기기
df = pd.read_csv("../data/video_sample_add_pre.csv", encoding="utf-8-sig", index_col=0)
df.head() 

Unnamed: 0,video_id,channel_id,video_title,video_description,video_tags,video_duration,video_published,video_category,video_info_card,video_with_ads,video_end_screen,video_cluster,crawled_date,year,month,day,pre_text
0,JCqgRM-2_GU,UCU-wl-PZYxpWeDl1OIiE_hQ,어머니와 함께 방문 한 15 세 중학생 환자분의 포경수술 영상 (포경 소매법) C...,수술 적응증 상담 : 010-9190-7575\n카카오톡 상담 : gnjurolog...,"['강남비뇨기과', '박천진', '포경', '포경수술', '귀두포피', '위생관리']",75,2024-10-06,Education,0,0,0,-1,2024-10-07 17:37:39,2024,10,6,어머니와 함께 15 세 중학생 환자분의 포경수술 영상 포경 소매법 Circumcis...
1,R0pRp-3S5Yo,UCm69xuysxVgNuRpmXCiXlPw,YURIAN (유리안) - FUCKING MY CITY 2024 ver. (Ft. ...,#YURIAN #유리안 #FUCKING_MY_CITY_2024 #Rredrain #...,[],177,2024-10-06,Music,0,0,0,-1,2024-10-07 17:37:57,2024,10,6,YURIAN 유리안 FUCKING MY CITY 2024 ver Ft. Rredra...
2,RsVKZrt2DSo,UCVcLszfUBN9rTmc0A0uDRSQ,대리모 사건이 소환한 합법화 논쟁 / KBC뉴스,불임이나 난임 부부가 브로커에게 돈을 주고 출산을 의뢰한 '대리모 범죄'가 14년 ...,"['kbc', 'kbc광주방송', '깨비씨', 'kbc 뉴스']",158,2024-10-06,News & Politics,0,0,0,-1,2024-10-07 17:38:23,2024,10,6,대리모 사건이 소환한 합법화 논쟁 KBC뉴스 kbc kbc광주방송 깨비씨 kbc 뉴스
3,2CpgC1K7jNI,UCxCsxXsJPDMfjcXEEV09m9w,대포죽순이요????푸바오 핸펀몰카사건? 10.6 푸바오 이야기 FUBAO PANDA...,,[],400,2024-10-06,Travel & Events,0,0,0,-1,2024-10-07 17:38:24,2024,10,6,대포죽순이요 푸바오 핸펀몰카사건 푸바오 이야기 FUBAO PANDA CUTE ANI...
4,C-jOF1sgdOg,UCYCb_mX7P1xKrB5iaT8y0Wg,"일본에 남겨진 한국의 문화유산【아시아의 숨은 혼, 백제를 가다 Part 2】",0:00 2012년 6월 18일 다카이다야마 고분\n1:43 2012년 6월 25일...,"['TJB', 'tjb', 'tjb대전방송', '엑스포로131', '엑스포로', '...",572,2024-10-07,Entertainment,0,0,0,-1,2024-10-07 17:38:24,2024,10,7,일본에 한국의 문화유산 아시아의 숨은 혼 백제를 가다 Part 2 TJB tjb t...


In [None]:
# 한글여부 판독기
def calculate_korean_ratio(text, ratio=.0):
    pattern = '([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)' # E-mail제거
    text = re.sub(pattern=pattern, repl='', string=text)
    pattern = '(http|ftp|https)://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' # URL제거
    text = re.sub(pattern=pattern, repl=' ', string=text)
    clean_text = re.sub(r"[^\w\s]", " ", text) # 특수문자 제거
    words = clean_text.split()
    total_words = len(words)  # 전체 단어 개수    
    korean_words = 0  # 한글 단어 개수

    for word in words:
        for char in word:
            if '\uAC00' <= char <= '\uD7A3':
                korean_words += 1
    try:
        korean_ratio = korean_words / total_words
        if korean_ratio >= ratio:
            return True
        else:
            return False
    except ZeroDivisionError as e:
        return False

In [None]:

class ZED():
    def __init__(self, category_df,
                 feature_extract_model='BM-K/KoSimCSE-roberta-multitask', 
                 cache_dir='../../models/huggingface',
                 use_cluster = None,
                 use_cuda = True):
        
        # Check if category_df is a string (file path) or a DataFrame
        if isinstance(category_df, str):
            self.category_df = pd.read_csv(category_df, encoding="utf-8-sig", index_col=0)
        elif isinstance(category_df, pd.DataFrame):
            self.category_df = category_df
        else:
            raise ValueError("category_df must be a file path or a pandas DataFrame")


        if use_cluster:
            self.category_df = self.category_df[self.category_df.cluster.isin(use_cluster)]
        self.category_df["category"] = self.category_df['대분류']+ " " + self.category_df['소분류']
        
        self.first_category_df = self.category_df[self.category_df.status==1]
        self.second_category_df = self.category_df[self.category_df.status==2]

        category_df_for_index = self.category_df.drop_duplicates(["category"])
        self.category_index = {i:(int(j) if pd.isna(k) else int(k)) for i,j,k in zip(category_df_for_index["category"], category_df_for_index["cluster"], category_df_for_index["new_cluster"])}
        self.category_index["기타"] = 0

        self.first_category = (self.first_category_df['대분류'] + " " + self.first_category_df['소분류']).tolist()
        self.second_category = (self.second_category_df['대분류'] + " " + self.second_category_df['소분류']).tolist()

        self.first_category_keyword = [i.split("/") for i in self.first_category_df["검색키워드"]]
        self.second_category_keyword = [" ".join(i.split("/")) for i in self.second_category_df["검색키워드"]]
        
        self.feature_model = AutoModel.from_pretrained(feature_extract_model, cache_dir=cache_dir) 
        self.feature_tokenizer = AutoTokenizer.from_pretrained(feature_extract_model, cache_dir=cache_dir)
    
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.use_cuda = False
        if use_cuda:
            if self.device == "cuda":
                self.use_cuda = True
                # 모델을 CUDA로 이동
                self.feature_model.to(self.device)
            else:
                print("cuda를 지원하지 않습니다.")

        self.inputs = self.feature_tokenizer(self.second_category_keyword, padding=True, truncation=True, return_tensors="pt")
        
        if self.use_cuda:
            self.inputs = {key: tensor.to(self.device) for key, tensor in self.inputs.items()}  # 모든 입력 텐서를 같은 장치로 이동합니다.
        self.label_embedding, _ = self.feature_model(**self.inputs, return_dict=False)


    def classification(self, text, etc_score=39):
        if isinstance(text, str):

            ### 외국어 분류
            if not calculate_korean_ratio(text, ratio=.0):
                return "해외채널", 9999, 0
            
            ### 선분류
            content = text.split('[SEP]')
            youtube_category = content[-1].strip()
            if youtube_category in ["Gaming"]:
                _content = " ".join(content).split()
            else:
                _content = content[0].strip().split()

            for i, keywords in enumerate(self.first_category_keyword):
                if len(set(_content) & set(keywords)) > 0:
                    return self.first_category[i], self.category_index[self.first_category[i]], 0

            ### 후분류
            ### 텍스트 임베딩
            try:
                inputs = self.feature_tokenizer(text, padding=True, truncation=True, return_tensors="pt")
                if self.use_cuda:
                    inputs = {key: tensor.to(self.device) for key, tensor in inputs.items()}  # 모든 입력 텐서를 같은 장치로 이동합니다.
                embedding = self.feature_model(**inputs, return_dict=False)
            except ValueError as e:
                return "기타", 0, 0
            sentences_similarity = dict()
            for i, ce in enumerate(self.label_embedding):
                sentences_similarity[self.second_category[i]] = float(self.cal_score(embedding[0][0][0], ce[0])[0][0].item())
            sentences_similarity = self.add_score(youtube_category, sentences_similarity)
            max_value = max(sentences_similarity.values())  # 가장 큰 값을 찾음

            ### etc_score 보다 낮으면 후분류 키워드가 제목안에 들어있으면 그걸로
            ### 아니면 기타로
            if max_value < etc_score:
                _content = content[0].strip().split()
                for i, keywords in enumerate(self.second_category_keyword):
                    if len(set(_content) & set(keywords)) > 0:
                        return self.second_category[i], self.category_index[self.second_category[i]], 0
                classification = "기타"
            else:
                classification = [key for key, value in sentences_similarity.items() if value == max_value][0]  # 가장 큰 값과 일치하는 모든 키를 찾음   
            return classification, self.category_index[classification], max_value
        
        if isinstance(text, list):
            results = dict()
            not_class_texts = list()
            not_class_categorys = list()

            for index, t in enumerate(text):
                class_status = False
                ### 외국어 분류
                if not calculate_korean_ratio(t, ratio=.0):
                    class_status = True
                    results[index] = ("해외채널", 9999, 0)
                
                ### 선분류
                content = t.split('[SEP]')
                youtube_category = content[-1].strip()
                if youtube_category in ["Gaming"]:
                    _content = " ".join(content).split()
                else:
                    _content = content[0].strip().split()

                for i, keywords in enumerate(self.first_category_keyword):
                    if len(set(_content) & set(keywords)) > 0:
                        class_status = True
                        results[index] = (self.first_category[i], self.category_index[self.first_category[i]], 0)
                        break
                if not class_status:
                    results[index] = ""
                    not_class_texts.append(t)
                    not_class_categorys.append(youtube_category)
                    
            if len(not_class_texts) == 0:
                return [value for value in results.values()]

            inputs = self.feature_tokenizer(not_class_texts, padding=True, truncation=True, return_tensors="pt")
            if self.use_cuda:
                inputs = {key: tensor.to(self.device) for key, tensor in inputs.items()}  # 모든 입력 텐서를 같은 장치로 이동합니다.
            embedding = self.feature_model(**inputs, return_dict=False)
            
            for i, e in enumerate(embedding[0]):
                sentences_similarity = dict()
                for j, ce in enumerate(self.label_embedding):
                    sentences_similarity[self.second_category[j]] = float(self.cal_score(e[0], ce[0])[0][0].item())
                sentences_similarity = self.add_score(not_class_categorys[i], sentences_similarity)
                max_value = max(sentences_similarity.values())  # 가장 큰 값을 찾음
                ### etc_score 보다 낮으면 후분류 키워드가 제목안에 들어있으면 그걸로
                ### 아니면 기타로

                if max_value < etc_score:
                    etc_status = True
                    _content = content[0].strip().split()
                    for j, keywords in enumerate(self.second_category_keyword):
                        if len(set(_content) & set(keywords)) > 0:
                            etc_status = False
                            for key, value in results.items():
                                if value == "":
                                    results[key] = (self.second_category[j], self.category_index[self.second_category[j]], 0)
                                    break
                            break
                    if etc_status:
                        for key, value in results.items():
                            if value == "":
                                results[key] = ("기타", 0, 0)
                                break
                else:
                    classification = [key for key, value in sentences_similarity.items() if value == max_value][0]  # 가장 큰 값과 일치하는 모든 키를 찾음   
                    for key, value in results.items():
                        if value == "":
                            results[key] = (classification, self.category_index[classification], max_value)
                            break

            return [value for value in results.values()]

    ### 거리계산 함수
    def cal_score(self, a, b):
        if len(a.shape) == 1: a = a.unsqueeze(0)
        if len(b.shape) == 1: b = b.unsqueeze(0)

        a_norm = a / a.norm(dim=1)[:, None]
        b_norm = b / b.norm(dim=1)[:, None]
        return torch.mm(a_norm, b_norm.transpose(0, 1)) * 100 


    ### 카테고리에 따른 추가점수
    def add_score(self, youtube_category, sentences_similarity):
        
        if youtube_category == "Music":
            for key in sentences_similarity:
                first_word = key.split()[0]  # 키를 split하여 첫 번째 원소를 추출
                if first_word == "음악":
                    sentences_similarity[key] += 5  # 값에 5를 더함

        elif youtube_category == "Gaming":
            for key in sentences_similarity:
                first_word = key.split()[0]  # 키를 split하여 첫 번째 원소를 추출
                if (first_word == "게임"):
                    sentences_similarity[key] += 5  # 값에 5를 더함

        elif youtube_category == "Sports":
            for key in sentences_similarity:
                first_word = key.split()[0]  # 키를 split하여 첫 번째 원소를 추출
                if first_word == "스포츠":
                    sentences_similarity[key] += 5  # 값에 5를 더함

        elif youtube_category == "News & Politics":
            for key in sentences_similarity:
                first_word = key.split()[0]  # 키를 split하여 첫 번째 원소를 추출
                if first_word == "뉴스":
                    sentences_similarity[key] += 5  # 값에 5를 더함
                    
        else:
            for key in sentences_similarity:
                first_word = key.split()[0]  # 키를 split하여 첫 번째 원소를 추출
                if first_word == "게임":
                    sentences_similarity[key] -= 3  # 값에 -3을 뺌

        return sentences_similarity