In [1]:
import numpy as np
import pandas as pd
import re
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, TFBertModel
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GRU, Dense, Bidirectional, Input, Lambda, GlobalMaxPool1D, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report

# 한국어 불용어 리스트 정의
korean_stopwords = [
    'ifg', '이', '그', '저', '의', '가', '에', '를', '은', '는', '을', '도', '와', '과', '에서', '하고',
    '가까스로', '가령', '각', '각각', '각자', '각종', '같다', '같이', '거의', '것', '결과', '결국', '고로', '곧', 
    '과연', '관계', '관한', '관련', '그래서', '그러나', '그러니까', '그러면', '그러므로', '그런데', '그럼', '그리고', 
    '그치지', '근거', '기준', '기타', '까지', '나', '너', '너희', '다만', '다시', '다음', '단지', '당신', '대해', '더구나', 
    '더라도', '도달', '동안', '두번째', '든간에', '때문', '또한', '마저', '마치', '만', '만약', '만큼', '말하자면', '매번', 
    '모두', '물론', '반대로', '반드시', '비교적', '뿐만', '사람들', '상대', '생각', '설령', '셋', '소생', '시간', '심지어', 
    '아니', '아래', '아무', '약간', '어느', '어떻게', '언제', '얼마', '없이', '에도', '에게', '여전히', '역시', '예를', 
    '오로지', '오직', '왜냐하면', '요컨대', '우리', '위해', '은', '이', '이것', '이때', '이라면', '이렇다', '이렇게', '이런', 
    '이만큼', '이번', '이상', '있다', '자신', '잠시', '전부', '정도', '제각기', '조차', '주저', '줄', '중에서', '즉', '지든지', 
    '진짜로', '차라리', '참', '첫번째', '총적으로', '통하여', '하면', '하지마', '하지만', '한마디', '한적이', '한켠', '할', 
    '할수록', '함께', '했어요', '향하여', '허걱', '형식으로', '혹시', '혼자', '훨씬', '키키', '편입', '수', '비', '김영', '발신',
    '국외', '거', '어요', '요', '그리고', '입니다', '제가', '저희', '더', '등', '혹은', '위', '밑', '따라', '하지만', 
    '각', '사실', '먼저', '이제', '동안', '경우', '바로', '이후', '바로', '때', '있음', '동안', '것도', '같이', '누구', 
    '매우', '모두', '바로', '여기', '우리들', '대해서', '대해', '있고', '있으며', '있는', '해서', '그러고', '이하', '이전','로','로서',
    '로써','월','화','수','목','금','토','일','시','분','김동희','석정한','최민관'
]

# 불용어 제거 함수 정의
def remove_stopwords(text):
    words = text.split()
    filtered_words = [word for word in words if word not in korean_stopwords]
    return ' '.join(filtered_words)

# 데이터 전처리 함수 정의
def remove_non_korean(text):
    return re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', text)

# 데이터 파일 불러오기 및 병합
files = [
    'C:/Users/master/Downloads/금융_categorized-2.csv',
    'C:/Users/master/Downloads/기관_categorized-2.csv',
    'C:/Users/master/Downloads/기타_categorized.csv',
    'C:/Users/master/Downloads/이벤트_categorized-2.csv',
    'C:/Users/master/Downloads/지인_categorized-2.csv',
    'C:/Users/master/Downloads/채용_categorized-2.csv',
    'C:/Users/master/Downloads/택배_categorized-2.csv'
]

# 각 카테고리 이름
categories = ["금융", "기관", "기타", "이벤트", "지인", "채용", "택배"]

# 데이터프레임을 하나로 결합
dataframes = []
for file, category in zip(files, categories):
    df = pd.read_csv(file)
    df['category'] = category  # 카테고리 추가
    dataframes.append(df)

# 모든 데이터를 하나로 결합
all_data = pd.concat(dataframes, ignore_index=True)

# 필요한 컬럼 선택
all_data = all_data[['v1', 'v2', 'category']]  # 'v1' = 라벨, 'v2' = 메시지 텍스트

# 라벨 값 변환 ('ham' -> 0, 'spam' -> 1) 및 개행 문자 제거
all_data['v1'] = all_data['v1'].str.strip()  # 개행 문자 제거
all_data['v1'] = all_data['v1'].replace(['ham', 'spam'], [0, 1])

# 데이터 전처리 및 불용어 제거
all_data['cleaned_text'] = all_data['v2'].apply(remove_non_korean).apply(remove_stopwords)

# BERT 토크나이저 및 모델 불러오기
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
bert_model = TFBertModel.from_pretrained('bert-base-multilingual-cased', output_attentions=True)

# BERT 입력 형식으로 변환
max_length = 100  # 최대 시퀀스 길이
input_ids_list = []
attention_masks_list = []

for sent in all_data['cleaned_text']:
    encoded_dict = tokenizer.encode_plus(
                        sent,
                        add_special_tokens=True,
                        max_length=max_length,
                        padding='max_length',
                        truncation=True,
                        return_attention_mask=True,
                        return_tensors='tf',
                   )

    input_ids_list.append(encoded_dict['input_ids'])
    attention_masks_list.append(encoded_dict['attention_mask'])

input_ids = tf.concat(input_ids_list, axis=0)
attention_masks = tf.concat(attention_masks_list, axis=0)

# 레이블 데이터
labels = all_data['v1'].values.astype(np.float32)  # 레이블을 float32로 변환

# 데이터 분리
X_train, X_test, y_train, y_test, mask_train, mask_test = train_test_split(
    input_ids.numpy(), labels, attention_masks.numpy(), test_size=0.2, random_state=42)

# 학습 데이터에서 다시 검증 데이터 분리
X_train, X_val, y_train, y_val, mask_train, mask_val = train_test_split(
    X_train, y_train, mask_train, test_size=0.2, random_state=25)

# 입력 레이어 정의
input_ids_layer = Input(shape=(max_length,), dtype=tf.int32, name="input_ids")
attention_mask_layer = Input(shape=(max_length,), dtype=tf.int32, name="attention_mask")

# BERT 출력
bert_inputs = {'input_ids': input_ids_layer, 'attention_mask': attention_mask_layer}
bert_output = Lambda(lambda x: bert_model(x, output_attentions=True))(bert_inputs)
sequence_output, attentions = bert_output[0], bert_output[-1]

# GRU 레이어
gru_output = Bidirectional(GRU(128, return_sequences=True))(sequence_output)
pooled_output = GlobalMaxPool1D()(gru_output)


# 드롭아웃 및 출력 레이어
dropout = Dropout(0.3)(pooled_output)
dense_output = Dense(64, activation='relu')(dropout)
output = Dense(1, activation='sigmoid')(dense_output)

# 모델 정의
model = Model(inputs=[input_ids_layer, attention_mask_layer], outputs=output)

# 모델 컴파일 및 학습
optimizer = Adam(learning_rate=2e-5, epsilon=1e-08)
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

# 라벨을 float32로 변환
y_train = y_train.astype(np.float32)
y_val = y_val.astype(np.float32)
y_test = y_test.astype(np.float32)

# 모델 훈련
history = model.fit(
    x={'input_ids': X_train, 'attention_mask': mask_train},
    y=y_train,
    epochs=50,
    batch_size=64,
    validation_data=({'input_ids': X_val, 'attention_mask': mask_val}, y_val)
)

# 모델 평가
y_pred = model.predict({'input_ids': X_test, 'attention_mask': mask_test})
y_pred = (y_pred > 0.5).astype(int)

# 다양한 평가 지표 출력
print(classification_report(y_test, y_pred))
# 정확도 및 다양한 평가 지표 출력
loss, accuracy = model.evaluate({'input_ids': X_test, 'attention_mask': mask_test}, y_test)
print(f'Loss: {loss}, Accuracy: {accuracy}')

# 다양한 평가 지표 출력
print(classification_report(y_test, y_pred))










Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions w

The following Variables were used a Lambda layer's call (lambda), but
are not present in its tracked objects:
  <tf.Variable 'tf_bert_model/bert/embeddings/word_embeddings/weight:0' shape=(119547, 768) dtype=float32>
  <tf.Variable 'tf_bert_model/bert/embeddings/token_type_embeddings/embeddings:0' shape=(2, 768) dtype=float32>
  <tf.Variable 'tf_bert_model/bert/embeddings/position_embeddings/embeddings:0' shape=(512, 768) dtype=float32>
  <tf.Variable 'tf_bert_model/bert/embeddings/LayerNorm/gamma:0' shape=(768,) dtype=float32>
  <tf.Variable 'tf_bert_model/bert/embeddings/LayerNorm/beta:0' shape=(768,) dtype=float32>
  <tf.Variable 'tf_bert_model/bert/encoder/layer_._0/attention/self/query/kernel:0' shape=(768, 768) dtype=float32>
  <tf.Variable 'tf_bert_model/bert/encoder/layer_._0/attention/self/query/bias:0' shape=(768,) dtype=float32>
  <tf.Variable 'tf_bert_model/bert/encoder/layer_._0/attention/self/key/kernel:0' shape=(768, 768) dtype=float32>
  <tf.Variable 'tf_bert_model/bert

In [14]:
import joblib
# 모델 저장
model.save('D:/논문/nonurl/spam_detection_model2.h5')
print("모델이 저장되었습니다.")

# 토크나이저 저장
tokenizer.save_pretrained('D:/논문/nonurl/tokenizer')
print("토크나이저가 저장되었습니다.")

# 전처리 함수 저장
joblib.dump(remove_non_korean, 'D:/논문/nonurl/remove_non_korean_feat.pkl')
print("전처리 함수가 저장되었습니다.")

모델이 저장되었습니다.
토크나이저가 저장되었습니다.
전처리 함수가 저장되었습니다.


In [13]:
# 사용자로부터 문장을 입력받아 분류하는 함수
def classify_user_input(user_input):
    # 입력받은 문장을 전처리
    cleaned_input = remove_non_korean(user_input)
    cleaned_input = remove_stopwords(cleaned_input)

    # 입력받은 문장을 BERT 토크나이저로 변환
    encoded_dict = tokenizer.encode_plus(
        cleaned_input,
        add_special_tokens=True,
        max_length=max_length,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='tf',
    )

    input_ids = encoded_dict['input_ids']
    attention_mask = encoded_dict['attention_mask']

    # 모델 예측
    prediction = model.predict({'input_ids': input_ids, 'attention_mask': attention_mask})
    prediction_label = 'spam' if prediction[0][0] > 0.5 else 'ham'
    
    # 결과 출력
    print(f"입력한 문장: {user_input}")
    print(f"분류 결과: {prediction_label} (스팸 확률: {prediction[0][0]:.4f})")

# 사용자 입력을 테스트하는 코드
while True:
    user_input = input("테스트할 문장을 입력하세요 (종료하려면 'exit' 입력): ")
    if user_input.lower() == 'exit':
        break
    classify_user_input(user_input)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  안녕하세요


입력한 문장: 안녕하세요
분류 결과: ham (스팸 확률: 0.0013)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  엄경용고객님 상품이 발송될 예정입니다 배송조회  http://salewiz.co.kr/infobell_idx.html


입력한 문장: 엄경용고객님 상품이 발송될 예정입니다 배송조회  http://salewiz.co.kr/infobell_idx.html
분류 결과: spam (스팸 확률: 0.9763)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  엄마 나 교통사고가 났는데 핸드폰은 잃어버렸고 내 친구가 입원비 대신 내줬거든   100124-56-094740 여기로 10만원만 보내줘 친구가 대신 내줬어


입력한 문장: 엄마 나 교통사고가 났는데 핸드폰은 잃어버렸고 내 친구가 입원비 대신 내줬거든   100124-56-094740 여기로 10만원만 보내줘 친구가 대신 내줬어
분류 결과: spam (스팸 확률: 0.5559)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  지금 온라인으로 파손보험금 신청중인데 나 폰 인증이 안돼서 아빠명의로 먼저 신청 해도될까?


입력한 문장: 지금 온라인으로 파손보험금 신청중인데 나 폰 인증이 안돼서 아빠명의로 먼저 신청 해도될까?
분류 결과: spam (스팸 확률: 0.8679)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  나 아직 밥 안먹었는데 이 수업 끝나고 밥 먹자


입력한 문장: 나 아직 밥 안먹었는데 이 수업 끝나고 밥 먹자
분류 결과: ham (스팸 확률: 0.0003)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  [쿠팡] 2박스 문 앞(으)로 배송 했습니다.(주문: 김동희)주문번호: 27100072718371


입력한 문장: [쿠팡] 2박스 문 앞(으)로 배송 했습니다.(주문: 김동희)주문번호: 27100072718371
분류 결과: ham (스팸 확률: 0.0274)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  결제되었습니다.


입력한 문장: 결제되었습니다.
분류 결과: ham (스팸 확률: 0.0012)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  대학원 입시설명회


입력한 문장: 대학원 입시설명회
분류 결과: ham (스팸 확률: 0.0320)


테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  exit


In [2]:
import pandas as pd
import numpy as np
import re
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import joblib

# URL 전처리 함수
def preprocess_url(url):
    # URL에서 유의미한 정보 추출 (프로토콜, 도메인, 경로 등)
    url = url.lower()
    url = re.sub(r'http[s]?://', '', url)  # 프로토콜 제거
    url = re.sub(r'[^a-zA-Z0-9]', ' ', url)  # 특수문자 제거
    return url

# CSV 파일 읽기 (v1 = 라벨, v2 = URL)
data = pd.read_csv('C:/Users/master/Downloads/URL_Data.csv')

# 데이터 전처리
data['v2'] = data['v2'].apply(preprocess_url)

# 라벨 인코딩 ('ham' -> 0, 'spam' -> 1)
data['v1'] = data['v1'].replace(['ham', 'spam'], [0, 1])

# URL 벡터화
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(data['v2'])

# 라벨 추출
y = data['v1'].values

# 데이터 분할 (훈련/테스트)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# RandomForest 분류기 모델 학습
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
rf_classifier.fit(X_train, y_train)

# 테스트 데이터에 대한 예측 수행
y_pred = rf_classifier.predict(X_test)

# 모델 성능 평가
print("정확도:", accuracy_score(y_test, y_pred))
print("\n분류 리포트:\n", classification_report(y_test, y_pred))

# 모델 및 벡터화기 저장
joblib.dump(rf_classifier, 'rf_classifier.pkl')
joblib.dump(vectorizer, 'url_vectorizer.pkl')

print("모델과 벡터화기가 저장되었습니다.")


정확도: 0.9230688497061293

분류 리포트:
               precision    recall  f1-score   support

           0       0.86      0.99      0.92      4542
           1       0.99      0.86      0.92      4986

    accuracy                           0.92      9528
   macro avg       0.93      0.93      0.92      9528
weighted avg       0.93      0.92      0.92      9528

모델과 벡터화기가 저장되었습니다.


In [None]:
def analyze_url(url):
    processed_url = preprocess_url(url)  # URL 전처리
    url_features = vectorizer.transform([processed_url])  # URL 벡터화
    url_prediction = rf_classifier.predict_proba(url_features)[0][1]  # 스팸 확률
    return url_prediction  # 악성 URL 확률 반환

# 사용자로부터 문장을 입력받아 URL과 문장을 각각 분석하는 함수
def classify_user_input(user_input):
    # URL 추출 정규식
    url_pattern = re.compile(r'https?://\S+|www\.\S+')
    urls_in_input = url_pattern.findall(user_input)

    # URL이 포함된 경우
    if urls_in_input:
        print(f"입력된 URL: {urls_in_input}")
        
        for url in urls_in_input:
            url_spam_prob = analyze_url(url)  # URL 스팸 확률 계산
            print(f"URL 분석 결과: {url} -> 스팸 확률: {url_spam_prob:.4f}")
            
            # 스팸 확률이 높으면 URL이 악성이라고 판단하고 종료
            if url_spam_prob > 0.5:
                print(f"분류 결과: URL이 스팸일 가능성이 높습니다.")
                return
    
    # URL이 없거나 안전하면 텍스트 분석 진행
    cleaned_input = remove_non_korean(user_input)
    cleaned_input = remove_stopwords(cleaned_input)

    # 입력받은 문장을 BERT 토크나이저로 변환
    encoded_dict = tokenizer.encode_plus(
        cleaned_input,
        add_special_tokens=True,
        max_length=max_length,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='tf',
    )

    input_ids = encoded_dict['input_ids']
    attention_mask = encoded_dict['attention_mask']

    # 모델 예측
    prediction = model.predict({'input_ids': input_ids, 'attention_mask': attention_mask})
    prediction_label = 'spam' if prediction[0][0] > 0.5 else 'ham'
    
    # 결과 출력
    print(f"입력한 문장: {user_input}")
    print(f"문장 분석 결과: {prediction_label} (스팸 확률: {prediction[0][0]:.4f})")

# 사용자 입력을 테스트하는 코드
while True:
    user_input = input("테스트할 문장을 입력하세요 (종료하려면 'exit' 입력): ")
    if user_input.lower() == 'exit':
        break
    classify_user_input(user_input)

테스트할 문장을 입력하세요 (종료하려면 'exit' 입력):  엄경용고객님 상품이 발송될 예정입니다 배송조회  http://salewiz.co.kr/infobell_idx.html


입력된 URL: ['http://salewiz.co.kr/infobell_idx.html']
URL 분석 결과: http://salewiz.co.kr/infobell_idx.html -> 스팸 확률: 0.4200
입력한 문장: 엄경용고객님 상품이 발송될 예정입니다 배송조회  http://salewiz.co.kr/infobell_idx.html
문장 분석 결과: spam (스팸 확률: 0.9980)
