In [None]:
import tensorflow
from tensorflow.keras.layers import Reshape
from keras.layers import Input, Embedding, Reshape, LSTM, Dense, Flatten, Bidirectional, Concatenate
from keras.models import Model
import os
import numpy as np
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import re
import pickle


In [None]:
batch_size = 16
max_sequence_length = 384
max_word_length = 20
embedding_size = 128
lstm_units = 256
num_classes = 14

directory_train='./training-data'
directory_test='./testing-data'
directory_all='./all-data'
model_path='bi_best_model.keras' #modelin konumu
model_test_path='./deneme-veriler'#modelin testi için kullanılacak verinin konumu

In [None]:
train_test_texts=[]

for filename in os.listdir(directory_all):
    if filename.endswith('.txt'):
        file_path = os.path.join(directory_all, filename)
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
            content = re.sub(r'<start_".*?">', '', content)
            content = re.sub(r'<end_".*?">', '', content)
            txt_data = content.split()
            train_test_texts.extend(txt_data)

tokenizer = Tokenizer(char_level=True, lower=False)
tokenizer.fit_on_texts(train_test_texts)

vocab_len = len(tokenizer.word_index) + 1

In [None]:
#SAVE TOKENIZER
with open('tokenizer.pickle', 'wb') as handle:
    pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)

print(tokenizer.word_index)

In [None]:
def process_text_files(directory):
    all_texts = []
    all_texts_extend = []

    # Iterate over all files in the directory
    for filename in os.listdir(directory):
        if filename.endswith('.txt'):
            file_path = os.path.join(directory, filename)
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()
                content = re.sub(r'<start_".*?">', '', content)
                content = re.sub(r'<end_".*?">', '', content)
                txt_data = content.split()
                all_texts.append(txt_data)
                all_texts_extend.extend(txt_data)

    #tokenizer = Tokenizer(char_level=True, lower=False)
    #tokenizer.fit_on_texts(all_texts_extend)

    all_sequences=[]
    for i in range(len(all_texts)):
        all_sequences.append(tokenizer.texts_to_sequences(all_texts[i]))
        

    max_word_length = 20
    trunc_type = 'post'
    padding_type = 'post'

    # Pad each word sequence to the length of the longest word
    all_padded_sequences=[]
    for i in range(len(all_sequences)):
        padded_sequence = pad_sequences(all_sequences[i], maxlen=max_word_length, padding=padding_type, truncating=trunc_type)
        all_padded_sequences.append(padded_sequence)

    return all_padded_sequences, tokenizer.word_index


#   TRAIN
directory_path = directory_train
padded_sequences, word_index = process_text_files(directory_path)
padded_arrays = []
for arr in padded_sequences:
    pad_width = ((0, max_sequence_length - len(arr)), (0, 0))
    padded_arr = np.pad(arr, pad_width, mode='constant', constant_values=0)
    padded_arrays.append(padded_arr)

#  TEST
directory_path_test = directory_test
padded_sequences_test, word_index_test = process_text_files(directory_path_test)
padded_arrays_test = []
for arr in padded_sequences_test:
    pad_width = ((0, max_sequence_length - len(arr)), (0, 0))
    padded_arr = np.pad(arr, pad_width, mode='constant', constant_values=0)
    padded_arrays_test.append(padded_arr)


np.set_printoptions(threshold=np.inf)

#son array'i numpy array'ine çevir
x_train=np.array(padded_arrays)
x_test=np.array(padded_arrays_test)

print(x_train.shape)
print(x_test.shape)

np.set_printoptions(threshold=1000)

In [None]:
global wordsList,wordsIndextList

wordsList = []
wordsIndextList = []
max_sequence_length = 384


label_list = ["<start_\"company_name\">",
              "<end_\"company_name\">",
                  "<start_\"date\">",
                  "<end_\"date\">",
                  "<start_\"time\">",
                  "<end_\"time\">",
                  "<start_\"receipt_number\">",
                  "<end_\"receipt_number\">",
                  "<start_\"tax\">",
                  "<end_\"tax\">",
                  "<start_\"amount\">",
                  "<end_\"amount\">"]

label_list2= ["Pad","Others","B_Comp","l_Comp","B_Date","l_Date","B_Time","l_Time",
                  "B_Receipt","l_Receipt","B_Tax","l_Tax","B_Amount","l_Amount"]



def one_hot_encode_index_list():
    oneHotEncodedList = []
    unique_labels_len = len(sorted(set(label_list2)))
    for i in wordsIndextList:
        bitlist = list(0 for i in range(unique_labels_len))
        bitlist[i] = 1
        oneHotEncodedList.append(bitlist)

    for i in  range(max_sequence_length-len(oneHotEncodedList)):
        oneHotEncodedList.append([1,0,0,0,0,0,0,0,0,0,0,0,0,0])
    return oneHotEncodedList


def splitfunction(text:str):
    global wordsList,wordsIndextList
    wordsList = []
    wordsIndextList = []
    words = text.split()
    for word in words:
        wordsList.append(word)
        wordsIndextList.append(1)



def indexAssignment(tag,last):
    if(tag==None):
        return None
    index = label_list.index(tag)+2
    if(last == "l"):
       index +=1
    return index


def checkTag(word:str):
    for tag in label_list:
        index = word.find(tag)
        if index != -1:
            return tag
    return None



def tagingWords():
    control = None
    counter = 0
    for word in wordsList:
        tag = checkTag(word)
        if tag != None:
            if(word.find("<start_") != -1):
                control = tag
            if(word.find("<start_") != -1 and word.find("<end_") == -1):
                wordsIndextList[counter] = indexAssignment(tag,"B")
                counter+=1
            elif(word.find("<start_") != -1 and word.find("<end_") != -1):
                wordsIndextList[counter] = indexAssignment(tag,"B")
                counter+=1
            else:
                wordsIndextList[counter] = indexAssignment(tag,"B")
                counter+=1
            if(word.find("<end_") != -1):
                control = None
        elif control !=None:
            wordsIndextList[counter] = indexAssignment(control,"l")
            counter+=1
        else:
            counter+=1


def oneHotEncodedFunction(texts):
    splitfunction(texts)
    tagingWords()
    return np.array(one_hot_encode_index_list())



# TRAIN
folder_path_train =  directory_train
file_list_train = os.listdir(folder_path_train)
y_train= []
for file_name in file_list_train:
    if file_name.endswith(".txt"):
        file_path = os.path.join(folder_path_train, file_name)
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
            result = oneHotEncodedFunction(text)

            if len(y_train) == 0:
                y_train = np.array([result])
            else:
                y_train = np.concatenate((y_train, [result]), axis=0)


# TEST
folder_path_test =  directory_test
file_list_test = os.listdir(folder_path_test)
y_test= []
for file_name in file_list_test:
    if file_name.endswith(".txt"):
        file_path = os.path.join(folder_path_test, file_name)
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
            result = oneHotEncodedFunction(text)

            if len(y_test) == 0:
                y_test = np.array([result])
            else:
                y_test = np.concatenate((y_test, [result]), axis=0)


print(y_train.shape)
print(y_test.shape)

In [None]:
# LSTM Modeli

input_shape = (max_sequence_length, max_word_length)
input_layer = Input(shape=(max_sequence_length,max_word_length))
print(input_layer.shape)


In [None]:
char_embedding = Embedding(input_dim=vocab_len, output_dim=embedding_size)(input_layer)
print(char_embedding.shape)

In [None]:
reshaped_embedding = tensorflow.reshape(char_embedding, (-1, max_word_length, embedding_size))
print(reshaped_embedding.shape)

In [None]:
lstm_units = 64
char_lstm, forward_h, forward_c, backward_h, backward_c = Bidirectional(LSTM(units=lstm_units, return_sequences=True, return_state=True))(reshaped_embedding)
print(char_lstm)

char_lstm2, forward_h2, forward_c2, backward_h2, backward_c2 = Bidirectional(LSTM(units=lstm_units, return_sequences=True, return_state=True))(
    char_lstm, initial_state=[forward_h, forward_c, backward_h, backward_c])

state_h = Concatenate()([forward_h2, backward_h2])
state_c = Concatenate()([forward_c2, backward_c2])

print(char_lstm2)

In [None]:
reshaped = tensorflow.reshape(char_lstm2, (-1,max_sequence_length, lstm_units * 2))
print(reshaped.shape)

In [None]:
word_lstm1, forward_h, forward_c, backward_h, backward_c = Bidirectional(LSTM(units=lstm_units, return_sequences=True, return_state=True))(reshaped)
print(word_lstm1)

word_lstm2, forward_h2, forward_c2, backward_h2, backward_c2 = Bidirectional(LSTM(units=lstm_units, return_sequences=True, return_state=True))(word_lstm1, initial_state=[forward_h, forward_c, backward_h, backward_c])
print(word_lstm2)

state_h = Concatenate()([forward_h2, backward_h2])
state_c = Concatenate()([forward_c2, backward_c2])

In [None]:
classification_output = Dense(units=num_classes, activation='softmax')(word_lstm2)
print(word_lstm2)

In [None]:
model = Model(inputs=input_layer, outputs=classification_output)

In [None]:
#model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

#model.summary()

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint
import tensorflow.keras as keras

checkpoint = ModelCheckpoint('bi_best_model.keras', monitor='val_accuracy', save_best_only=True)

opt = keras.optimizers.Adam(learning_rate=0.0001)

model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

model.summary()



In [None]:

model.fit(x_train, y_train, epochs=10, batch_size=16, validation_data=(x_test, y_test), callbacks=[ checkpoint])

In [None]:
model.save("bi_model.keras")

In [None]:
#MODELİ ÇALIŞTIR
def model_run(directory):
    all_texts = []

    for filename in os.listdir(directory):
        if filename.endswith('.txt'):
            file_path = os.path.join(directory, filename)
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()
                txt_data = content.split()
                all_texts.append(txt_data)
                
    #TOKENIZER YÜKLE
    with open('tokenizer.pickle', 'rb') as handle:
        tokenizer_model = pickle.load(handle)     
    print(tokenizer_model.word_index)        

    all_sequences=[]
    for i in range(len(all_texts)):
        all_sequences.append(tokenizer_model.texts_to_sequences(all_texts[i]))
        
    max_word_length,trunc_type,padding_type = 20,'post','post'

    all_padded_sequences=[]
    for i in range(len(all_sequences)):
        padded_sequence = pad_sequences(all_sequences[i], maxlen=max_word_length, padding=padding_type, truncating=trunc_type)
        all_padded_sequences.append(padded_sequence)

    padded_arrays = []
    for arr in all_padded_sequences:
        pad_width = ((0, max_sequence_length - len(arr)), (0, 0))
        padded_arr = np.pad(arr, pad_width, mode='constant', constant_values=0)
        padded_arrays.append(padded_arr)

    xtrain=np.array(padded_arrays)
    model = tensorflow.keras.models.load_model(model_path)

    prediction=model.predict(xtrain)
    print(prediction.shape)
    #------------------------------------------------------------
    label_list = ["Pad", "Others", "B_Comp", "I_Comp", "B_Date", "I_Date", "B_Time", "I_Time", "B_Receipt", "I_Receipt", "B_Tax", "I_Tax", "B_Amount", "I_Amount"]

    #deneme yapılacak fişin konumu
    with open("./deneme-veriler/deneme-veri.txt", "r", encoding="utf-8") as file:
        words = file.read().split()
    
    predicted_labels = np.argmax(prediction, axis=-1)
    print(predicted_labels)
    
    #klasördeki ilk fişi temsil eder
    fis_1=predicted_labels[0]

    truncated_values = fis_1[:len(words)]#uzun olan diziyi kısalt ve iki dizi boyutunu eşle
    my_map = dict(zip(words, truncated_values))#kelimelere ve karşılarına predictionları 0 1 2 formatında maple

    result_map = {}
    for key, value in my_map.items():#anlamlı gözükmesi için 0 1 2 yerine etiket isimleri olacak şekilde maple
        index = min(value, len(label_list) - 1)
        label = label_list[index]
        result_map[key] = label

    print(result_map)
    for key, value in result_map.items():
        print(f'{key}: {value}')
        

In [None]:

#model_test_path içinde test edilecek veriler txt halde bulunmalıdır
model_run(model_test_path)