In [1]:
!git clone https://github.com/2runo/Curse-detection

fatal: destination path 'Curse-detection' already exists and is not an empty directory.


In [2]:
# !jupytext --to notebook /aiffel/aiffel/workplace/GiTi-4/GiTi-4/Curse-detection/src/model.py
# !jupytext --to notebook /aiffel/aiffel/workplace/GiTi-4/GiTi-4/Curse-detection/src/curse_detector.py
# !jupytext --to notebook /aiffel/aiffel/workplace/GiTi-4/GiTi-4/Curse-detection/src/embedding.py
# !jupytext --to notebook /aiffel/aiffel/workplace/GiTi-4/GiTi-4/Curse-detection/src/text_preprocessing.py

### Package Download

In [3]:
# !pip install jupytext
# !pip install text_preprocessing
!pip install youtube-transcript-api



In [4]:
!pwd

/aiffel/aiffel/workplace/GiTi-4/GiTi-4/Curse-detection/src


In [5]:
from youtube_transcript_api import YouTubeTranscriptApi
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential


from keras.models import Model
from keras.layers import Input, Dense, Flatten, Dropout, BatchNormalization, Embedding
from keras.layers import LeakyReLU, TimeDistributed, GRU, Bidirectional, MaxPooling1D, AveragePooling1D, concatenate
# from keras.layers.convolutional import MaxPooling1D, AveragePooling1D
# from keras.layers.merge import concatenate
from keras.constraints import max_norm
from tensorflow.keras.optimizers import Adam
import keras.backend as K
import pandas as pd
import numpy as np
import joblib

In [6]:
# model.py

INPUTSHAPE = (256, 4,)

def custom_acc(y_true, y_pred):
    # accuracy 함수
    return K.mean(K.round(y_true) == K.round(y_pred))

def np_custom_acc(y_true, y_pred):
    # accuracy 함수 (numpy 버전)
    return np.mean(np.round(y_true) == np.round(y_pred))

def time_distributed_layer(pool):
    # time distributed dense
    pool = Dropout(0.3)(pool)
    pool = TimeDistributed(Dense(512, kernel_constraint=max_norm(5.)))(pool)
    return pool

def layer(units, inter):
    # fully connected layer (BN, leaky-relu 사용)
    inter = Dense(units, kernel_constraint=max_norm(5.))(inter)
    inter = BatchNormalization()(inter)
    inter = LeakyReLU()(inter)
    inter = Dropout(0.2)(inter)
    return inter


def build_model():
    # 모델을 반환한다. (v4.2.2)

    inputs1 = Input(shape=INPUTSHAPE)
    # GRU
    inter = Bidirectional(GRU(512, return_sequences=True, reset_after=False), merge_mode='concat')(inputs1)
    # pooling
    avg_pool = AveragePooling1D(pool_size=3)(inter)
    avg_pool = time_distributed_layer(avg_pool)
    max_pool = MaxPooling1D(pool_size=3)(inter)
    max_pool = time_distributed_layer(max_pool)
    inter = concatenate([avg_pool, max_pool], axis=-1)
    
    # Flatten 레이어 추가
    inter = Flatten()(inter)
    # fully connected layers
    inter = layer(1024, inter)
    inter = layer(256, inter)
    inter = layer(64, inter)
    inter = Flatten()(inter)
    inter = layer(1024, inter)
    inter = Dense(64, kernel_constraint=max_norm(5.))(inter)
    inter = LeakyReLU(alpha=0.01)(inter)
    outputs = Dense(2, activation='softmax')(inter)

    model = Model(inputs=inputs1, outputs=outputs)

    optimizer = Adam(lr=0.001)
    model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=[custom_acc])

    return model

In [7]:
import numpy as np
import joblib


def char2vec(char):
    # (사전 예측된 dict를 통한) 글자 임베딩 수행
    if char == '~':
        # 빈 데이터 -> [0., 0., .., 0.]
        return np.array([0.] * len(vecdict['ㄱ']))
    return vecdict[char]


def embedding(x):
    # 데이터에 대해 임베딩을 수행
    return np.array([[char2vec(e) for e in ele] for ele in x])


def padding(x, length=256, pad=None):
    # 패딩을 수행
    result = []
    for n, ele in enumerate(x):
        if len(ele) == length:
            result.append(ele)
            continue
        if pad is None:
            pad = [0.] * len(ele[0])

        a, b = np.array(ele), np.array([pad] * (length - len(ele)))
        try:
            mid = np.concatenate((a, b))
        except:
            continue

        result.append(mid)
    return np.array(result)


def padding_x(x, length=256, pad=None):
    # 하나의 input 값에만 padding 수행
    if len(x) > length:
        return None
    if len(x) == length:
        return x
    if pad == None:
        pad = [0.] * len(x[0])
    a, b = np.array(x), np.array([pad] * (length - len(x)))
    try:
        mid = np.concatenate((a, b))
    except:
        None
    return mid


vecdict = joblib.load('models/char2vec.dic')  # 각 글자에 대응하는 vector가 담긴 dictionary

In [8]:
# text-preprocessing.py

'''댓글 데이터를 전처리한다. Embedding이 아니라 긴 댓글은 제거하거나 특수문자를 제거한다.'''

import re
import itertools

BASE_CODE, CHOSUNG, JUNGSUNG = 44032, 588, 28
CHOSUNG_LIST = ['ㄱ', 'ㄲ', 'ㄴ', 'ㄷ', 'ㄸ', 'ㄹ', 'ㅁ', 'ㅂ', 'ㅃ', 'ㅅ', 'ㅆ', 'ㅇ', 'ㅈ', 'ㅉ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']
JUNGSUNG_LIST = ['ㅏ', 'ㅐ', 'ㅑ', 'ㅒ', 'ㅓ', 'ㅔ', 'ㅕ', 'ㅖ', 'ㅗ', 'ㅘ', 'ㅙ', 'ㅚ', 'ㅛ', 'ㅜ', 'ㅝ', 'ㅞ', 'ㅟ', 'ㅠ', 'ㅡ', 'ㅢ',
                 'ㅣ']
JONGSUNG_LIST = ['~', 'ㄱ', 'ㄲ', 'ㄳ', 'ㄴ', 'ㄵ', 'ㄶ', 'ㄷ', 'ㄹ', 'ㄺ', 'ㄻ', 'ㄼ', 'ㄽ', 'ㄾ', 'ㄿ', 'ㅀ', 'ㅁ', 'ㅂ', 'ㅄ', 'ㅅ',
                 'ㅆ', 'ㅇ', 'ㅈ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']
GYUP2CHO = {'ㄳ': 'ㄱㅅ', 'ㄵ': 'ㄴㅈ', 'ㄶ': 'ㄴㅎ', 'ㄺ': 'ㄹㄱ', 'ㄻ': 'ㄹㅁ', 'ㄽ': 'ㄹㅅ', 'ㄾ': 'ㄹㅌ', 'ㄿ': 'ㄹㅍ',
            'ㅄ': 'ㅂㅅ'}  # 겹자음을 자음으로 변환



def remain_char(x):
    # 오직 한글 글자만 남기기 (띄어쓰기, 숫자, 특수문자, 영어 등은 삭제)
    return [''.join(re.findall(r'[ㄱ-ㅎㅏ-ㅣ각-힣]', i)) for i in x]  # 숫자도 삭제 (숫자 보존하려면 표현식 뒤에 '0-9' 추가)


def long2short(x):
    # 연속적으로 긴 단어는 간추리기
    # ef) f('ㅋㅋㅋㅋㅋㅋㅋ앜ㅋㅋㅋ') -> f('ㅋ앜ㅋ')
    result = []
    keep = True
    for ele in x:
        while True:
            candidates = set(re.findall(r'(\w)\1', ele))
            repeats = itertools.chain(*[re.findall(r"({0}{0}+)".format(c), ele) for c in candidates])

            keep = False
            for org in [i for i in repeats if len(i) >= 2]:
                ele = ele.replace(org, org[0])
                keep = True
            if not keep:
                break
        result.append(ele)
    return result


def analchar(test_keyword):
    # 글자 -> 초성, 중성, 종성 분리 (한글 아니면 그대로 반환)
    # ex) f('아녕ㅕㄴ') -> 'ㅇㅏ~ㄴㅕㅇㅕ~~ㄴ~~'
    split_keyword_list = list(test_keyword)

    result = []
    for keyword in split_keyword_list:
        # 한글 여부 확인 후 초성, 중성, 종성 분리
        if re.match(r'.*[가-힣]+.*', keyword) is not None:
            char_code = ord(keyword) - BASE_CODE
            char1 = int(char_code / CHOSUNG)
            result.append(CHOSUNG_LIST[char1])
            char2 = int((char_code - (CHOSUNG * char1)) / JUNGSUNG)
            result.append(JUNGSUNG_LIST[char2])
            char3 = int((char_code - (CHOSUNG * char1) - (JUNGSUNG * char2)))  # 종성 없으면 char3 = 0 = '~'
            result.append(JONGSUNG_LIST[char3])
        elif re.match(r'[ㄱ-ㅎ]', keyword) is not None:
            result.append(keyword + '~~')
        elif re.match(r'[ㅏ-ㅣ]', keyword) is not None:
            result.append('~' + keyword + '~')
        else:
            result.append(keyword)

    return ''.join(result)


def data2anal(x):
    # 글자 -> 초성, 중성, 종성 분리 (한글 아니면 그대로 반환)
    return [analchar(i) for i in x]


def replace_gyup(x):
    # 겹자음을 자음으로 변환한다.
    # ex) 'ㅄ새끼' -> 'ㅂㅅ새끼'
    result = []
    for ele in x:
        for gyup, cho in GYUP2CHO.items():
            ele = ele.replace(gyup, cho)
        result.append(ele)
    return result


def preprocess(texts):
    texts = remain_char(texts)  # 특수문자, 영어 등 제거
    texts = long2short(texts)   # 연속적인 글자 단축 (ㅋㅋㅋㅋ->ㅋ)
    texts = data2anal(texts)    # 초성, 중성, 종성 분리
    return texts

In [10]:
# curse_detector.py

class CurseDetector():
    max_sequence_length = None  # 클래스 멤버 변수로 max_sequence_length 정의

    def __init__(self, path=None):
        # 토크나이저 초기화
        self.tokenizer = Tokenizer()
        self.model = self.build_model()
        if path:
            self.load_weights(path)

    def build_model(self):
        model = Sequential()
        model.add(Embedding(input_dim=len(self.tokenizer.word_index) + 1, output_dim=100, input_length=self.max_sequence_length))
        model.add(Bidirectional(GRU(128)))
        model.add(Dense(1, activation='sigmoid'))
        model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
        print(len(self.tokenizer.word_index))
        return model

    def load_weights(self, path):
        try:
            self.model.load_weights(path)
        except OSError:
            raise Exception("학습된 모델을 불러오는 데 실패했습니다. 학습된 모델(weights.h5)을 models 폴더에 옮겨 주세요.")

    def train_model(self, sentences, labels):
        # 토크나이저 피팅
        self.tokenizer.fit_on_texts(sentences)

        # 텍스트를 시퀀스로 변환
        sequences = self.tokenizer.texts_to_sequences(sentences)
        
        # 시퀀스의 길이를 맞추기 위한 패딩
        padded_sequences = pad_sequences(sequences, maxlen=self.max_sequence_length)  
        # maxlen을 추가하여 시퀀스 길이를 맞춰줍니다.

        # 최대 시퀀스 길이 저장
        self.max_sequence_length = padded_sequences.shape[1]

        # 학습 데이터와 레이블을 훈련 및 검증 세트로 분할
        X_train, X_val, y_train, y_val = train_test_split(padded_sequences, labels, test_size=0.2, random_state=42)

        # 모델 학습
        # y_train, y_val을 np.array로 변환
        self.model.fit(X_train, np.array(y_train), epochs=10, validation_data=(X_val, np.array(y_val)), verbose=1)  
        
        # 학습된 모델 저장
        self.model.save_weights('curse_detection_weights.h5')
                            
#     def embedding(self, texts):
#         # 전처리, 임베딩 수행
#         texts = preprocess(texts)
#         embed = embedding(texts)
#         embed = padding(embed)
#         return embed
            
#     def predict(self, texts):
#         # 욕설 분류 수행
#         one = False
#         if isinstance(texts, str):
#             texts = [texts]
#             one = True

#         # 예측
#         embed = self.embedding(texts)
#         pred = self.model.predict(embed)

#         if one:
#             pred = pred[0]
#         return pred

    def predict(self, sentences):
        # 텍스트를 시퀀스로 변환
        sequences = self.tokenizer.texts_to_sequences([sentences])

        # 시퀀스의 길이를 맞추기 위한 패딩
        padded_sequences = pad_sequences(sequences, maxlen=self.max_sequence_length)
        
        score = self.model.predict(padded_sequences)
        
        return score

In [11]:
!pwd

/aiffel/aiffel/workplace/GiTi-4/GiTi-4/Curse-detection/src


In [12]:
# 텍스트 파일을 DataFrame으로 불러오기
file_path = 'dataset.txt'  # 파일 경로를 적절히 수정하세요

# 파일 불러오기
df1 = pd.read_csv(file_path, sep='|', header=None, names=['Text', 'Label'], encoding='utf-8')

# DataFrame 출력
# print(df1.tail())

df1.tail()

Unnamed: 0,Text,Label
5821,좌우 헬파이어 3개씩 6개 장착에 아파치보다 약하지만 20mm 기관포 장착임,0
5822,"세금 내놓으라고 데모질 중 ㅋㅋ간첩, 도둑놈 새끼들이 대통령 해처먹으니까 나도 같...",1
5823,너가 한 말 중에,0
5824,제갈대중 ㅇㅂ,0
5825,우리나라교회는 악마들이모여 주뎅이 처벌리고,1


In [13]:
df1 = pd.DataFrame({'Text':df1.Text, 'Label':df1.Label})
print(df1.tail())

                                                   Text  Label
5821         좌우 헬파이어 3개씩 6개 장착에 아파치보다 약하지만 20mm 기관포 장착임      0
5822  세금 내놓으라고 데모질 중 ㅋㅋ간첩, 도둑놈 새끼들이 대통령 해처먹으니까  나도 같...      1
5823                                          너가 한 말 중에      0
5824                                            제갈대중 ㅇㅂ      0
5825                           우리나라교회는 악마들이모여 주뎅이 처벌리고       1


In [14]:
df1.Label

0       1
1       0
2       1
3       0
4       1
       ..
5821    0
5822    1
5823    0
5824    0
5825    1
Name: Label, Length: 5826, dtype: int64

In [15]:
unique_values = df1['Label'].unique()
print(unique_values)

# 인덱스 값을 확인해서 그냥 그 줄을 삭제

[1 0]


In [16]:
df1['Label'].astype(float)

0       1.0
1       0.0
2       1.0
3       0.0
4       1.0
       ... 
5821    0.0
5822    1.0
5823    0.0
5824    0.0
5825    1.0
Name: Label, Length: 5826, dtype: float64

In [17]:
# CurseDetector 인스턴스 생성
curse_detector = CurseDetector()

# 모델 학습
# curse_detector.train_model(df1.Text, df1.Label)
curse_detector.train_model(df1.Text, df1.Label)

0
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [18]:
curse_detector.predict('씨발')

array([[0.2631239]], dtype=float32)

In [19]:
# data - YouTube 동영상의 자막 가져오기
video_id = "J8DQsTI2bvU"
srt = YouTubeTranscriptApi.get_transcript(video_id, languages=["ko"])

# 학습 데이터 생성
sentences = [entry['text'] for entry in srt]
labels = [1 if '새끼' in entry['text'] 
          or '돼지' in entry['text'] 
          or '좆' in entry['text'] 
          or '엿' in entry['text']
          else 0 for entry in srt]


# print(sentences)
# print(sentences.split)
# print(labels)

In [20]:
df = pd.DataFrame({'Text':sentences, 'Label':labels})
print(df.tail())

                    Text  Label
87  만나서 줄게 수고하십시오 그 유튜버분      0
88      아닙니까 그니까 만나서 준다고      0
89              어디냐고 아 됐      0
90  지금겠습니까한테 언니한테 한 번만 더      0
91                   연락해      0


In [21]:
# CurseDetector 인스턴스 생성
curse_detector = CurseDetector()

# 모델 학습
curse_detector.train_model(sentences, labels)

0
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [22]:
# 새로운 문장에 대한 예측

print(curse_detector.predict('씨발아'))
# print(curse_detector.predict('대이지 새끼야'))
# print(curse_detector.predict('좆가 짱 든가 그러면 너도 고소해라'))
# print(curse_detector.predict('인간새끼가'))
# print(curse_detector.predict('새끼야 너 어떻이 새끼야 뭐 언니분이'))

[[0.01278533]]


In [23]:
!pip install openai



In [24]:
from openai import OpenAI

client = OpenAI()

response = client.chat.completions.create(
    model ="gpt-3.5-turbo",
    
)

ImportError: cannot import name 'OpenAI' from 'openai' (/opt/conda/lib/python3.9/site-packages/openai/__init__.py)