# 패키지 임포트

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
import nltk
import os
import string
import re
import json
import keras
from data_preprocessing import *
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
from sklearn.feature_extraction.text import CountVectorizer

# 데이터 로드

In [2]:
TRAIN_DATA_PATH = "./IMDB_dataset/train"   #데이터 경로 설정
TEST_DATA_PATH = "./IMDB_dataset/test"     #데이터 경로 설정

def read_text_file(path):
    labels = ['neg','pos']
    if os.path.exists(path):
        text=[]
        text_label =[]
        for directory_name in os.listdir(path):
            if directory_name in labels:
                label_index = labels.index(directory_name)
                data_path = os.path.join(path,directory_name)
                for file in os.listdir(data_path):
                    with open(os.path.join(data_path,file),'r', encoding='utf-8') as f:
                        text.append(f.read())
                        text_label.append(label_index)
        return pd.DataFrame(text,columns =['texts']),pd.DataFrame(text_label,columns =['label'])
    
x_train,y_train = read_text_file(TRAIN_DATA_PATH) 
x_test,y_test = read_text_file(TEST_DATA_PATH)

#불용어 불러오기
with open('./english_stopwords.txt', 'r', encoding='utf-8') as file:
    stopwords = [line.strip() for line in file]

# 전처리

In [3]:
train = pd.concat([x_train, y_train], axis=1)
test = pd.concat([x_test, y_test], axis=1)

train.drop_duplicates(inplace=True)
test.drop_duplicates(inplace=True)

train[['texts']] = train[['texts']].applymap(lambda x:remove_punctuation(x))
train[['texts']] = train[['texts']].applymap(lambda x:x.lower())
test[['texts']] = test[['texts']].applymap(lambda x:remove_punctuation(x))
test[['texts']] = test[['texts']].applymap(lambda x:x.lower())

X_train = train.drop(columns=['label'])
X_test = test.drop(columns=['label'])
y_train = train['label']
y_test = test['label']

X_train = X_train['texts'].apply(clean_text)
X_test = X_test['texts'].apply(clean_text)

pattern = '[^a-z ]'
Clean_X_train=[]
Clean_X_test=[]

for sen in X_train:
    Clean_X_train.append(re.sub(pattern, '', str(sen)))
    
for sen in X_test:
    Clean_X_test.append(re.sub(pattern, '', str(sen)))

y_train=list(y_train)
y_test=list(y_test)

train_df = pd.DataFrame({'X_train': Clean_X_train, 'y_train': y_train})
test_df = pd.DataFrame({'X_test': Clean_X_test, 'y_test': y_test})

# 레이블 값에 따라 데이터프레임을 그룹화하고 각 그룹에서 8000개의 샘플을 랜덤하게 추출
train_df = train_df.groupby('y_train').apply(lambda x: x.sample(n=8000, random_state=42))

# 레이블 값에 따라 데이터프레임을 그룹화하고 각 그룹에서 2000개의 샘플을 랜덤하게 추출
test_df = test_df.groupby('y_test').apply(lambda x: x.sample(n=2000, random_state=42))

# 인덱스를 재설정합니다. drop=True 옵션을 사용하여 기존 인덱스를 제거합니다.
train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

x_train = train_df['X_train'].tolist()
y_train = train_df['y_train'].tolist()
x_test = test_df['X_test'].tolist()
y_test = test_df['y_test'].tolist()

to_txt=x_train+x_test
y=y_train+y_test

encoder=LabelEncoder()

encoder.fit(y)

label=encoder.transform(y)

y_train=list(label[:16000])
y_test=list(label[16000:])



# tensor space model 생성

In [5]:
#문서 분류에 도움되는 10,000개 단어 선별
vect = CountVectorizer(stop_words=stopwords)
X_dtm = vect.fit_transform(to_txt)
X_dtm = X_dtm.toarray()
X_new = SelectKBest(chi2, k=10000).fit(X_dtm, y)
TorF = X_new.get_support()
TorF
word_view=np.array(vect.get_feature_names_out())
feature_lst10000=word_view[TorF]



In [None]:
doc_lst=[]
for sen in x_train:
    doc_lst.append(sen.split())

test_lst=[]
for sen in x_test:
    test_lst.append(sen.split())
    
#Train_data에서 문서가 갖고 있는 선별한 feauture의 수 확인
count_lst=[]
for i in range(16000):
    total_feature_cnt=0
    for j in range(10000):
        if feature_lst10000[j] in doc_lst[i]:
            total_feature_cnt+=1
    count_lst.append(total_feature_cnt)
    
print('Train_data에서 가장 많은 feature를 가진 문서의 경우 feature',max(count_lst),' 개를 가짐')
print('Train_data에서 가장 적은 feature를 가진 문서의 경우 feature',min(count_lst),' 개를 가짐')

train_max_feature=max(count_lst)

#Test_data에서 문서가 갖고 있는 선별한 feauture의 수 확인
count_lst=[]
for i in range(4000):
    
    total_feature_cnt=0
    for j in range(10000):
        if feature_lst10000[j] in test_lst[i]:
            total_feature_cnt+=1
    count_lst.append(total_feature_cnt)
    
print('Test_data에서 가장 많은 feature를 가진 문서의 경우 feature',max(count_lst),' 개를 가짐')
print('Test_data에서 가장 적은 feature를 가진 문서의 경우 feature',min(count_lst),' 개를 가짐')

test_max_feature=max(count_lst)

max_feature=max(train_max_feature,test_max_feature)

In [13]:
#1-Channel TextCuboid 생성
textcuboid=[]
    

for i in range(16000):
    frame1=np.zeros((max_feature,256)) #(max_feature, 256)
    idx_cnt=0
    for j in range(10000):
        if feature_lst10000[j] in doc_lst[i]:
            #문서에서 선별한 단어(feature)의 위치를 찾아 임베딩 벡터 추출
            frame1[idx_cnt]=np.load('./elmo_embedding/train(IMDB256)/doc%d.npy'%i)[doc_lst[i].index(feature_lst10000[j])]
            idx_cnt+=1
    textcuboid.append(frame1)

textcuboid=np.array(textcuboid)

 np.save('./1-Channel textcuboid_IMDB(elmo).npy',textcuboid)

In [14]:
#1-Channel TextCuboid 생성
textcuboid_test=[]

    
for i in range(4000):
    frame1=np.zeros((max_feature,256)) #(max_feature, 256)
    idx_cnt=0
    for j in range(10000):
        if feature_lst10000[j] in test_lst[i]:
            #문서에서 선별한 단어(feature)의 위치를 찾아 임베딩 벡터 추출
            frame1[idx_cnt]=np.load('./elmo_embedding/test(IMDB256)/test%d.npy'%i)[test_lst[i].index(feature_lst10000[j])]
            idx_cnt+=1
    textcuboid_test.append(frame1)

textcuboid_test=np.array(textcuboid_test)
np.save('./1-Channel textcuboid_test_IMDB(elmo).npy',textcuboid_test)