In [2]:
import pandas as pd
import numpy as np
pd.set_option("display.max_rows", 300)
import random
import string
from collections import defaultdict
from tqdm import tqdm 
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, Embedding, Flatten, Conv2D, MaxPooling2D, Input, concatenate
from tensorflow.keras.utils import to_categorical, plot_model

# Data

First, we build a contrived dataset for text-classification. We will create random string texts with 30 up to 50 characters from all ascii letters, digits and punctuations. As we want to build a binary classification problem, we have to create 0's and 1's. We insert the letters ["A", "B", "C"] in order of apperance randomly in the generate text, as a sign of label being 0. And we insert letters of ["D", "E", "F"]. This pattern won't be perfectlz distinguishable because these letters can appear in other parts of the string as well, but we don't want it to be perfect either as it is natural for the dataset to have some noise too.

In [3]:
def generate_text(label):
    text = ''.join([random.choice(string.ascii_letters + string.digits + string.punctuation ) 
                    for _ in range(random.randint(30, 50))]) 
    
    if label==0:
        characters = ["A", "B", "C"]
    elif label==1:
        characters = ["D", "E", "F"]
    indices = sorted(random.choices(list(range(len(text))), k=3))
    return  (text[:indices[0]] + characters[0] + text[indices[0]:indices[1]] + characters[1] +
        text[indices[1]:indices[2]] + characters[2] + text[indices[2]:] )
    

df_len = 1400
df = pd.DataFrame(columns=["text", "label"])
df["label"] = [0] * int(df_len/2) + [1] * int(df_len/2)
df["text"] = df["label"].apply(lambda row: generate_text(row))

df.head()

Unnamed: 0,text,label
0,nHx^P(~i_=SlAhB@(lWn2y3}KehuVqPF~C\,0
1,"AtOB-l8sb;2V'-xa<16\|U)*~b8#1[4pcy}(4ZC\.cCN""1mn",0
2,Wyo]4rqH[HSj)Y|!FbA*syMcwNy=B:lU.C40,0
3,A}J=Z<zmryB`Cm.#y/PI5J=f^N1lgZHt=wj:syD*L,0
4,yr7k}>}]AK!j$AiPgj:B.=!<CLP09t;RE80Q{,0


In [None]:
train = df[200:-200].copy()
val = pd.concat([df[:100],  df[-100:]], axis=0).copy()
test = pd.concat([df[100:200], df[-200:-100]], axis=0).copy()

print(len(train), len(val), len(test))

In [None]:
vocabulary = defaultdict(int)

for i in tqdm(range(len(train))):
    characters = train["text"].iloc[i]
    for ch in characters:
        vocabulary[ch] += 1
        
for i, (k, w) in enumerate(vocabulary.items(), start=1):
    vocabulary[k] = [w, i]
    
print(len(vocabulary))

# Prepare Data

In [None]:
def func(row):
    output = []
    for ch in row:
        if vocabulary[ch]:
            output.append(vocabulary[ch][1])
    return output 

train["text_int_encoded"] = train["text"].apply(lambda row: func(row))
train["text_len"] = train["text"].apply(len)
max_seq_len = train["text_len"].max()
print("max seq len:", max_seq_len)

In [None]:
train.head(1)

In [None]:
val["text_int_encoded"] = val["text"].apply(lambda row: func(row))
val["text_len"] = val["text"].apply(len)

test["text_int_encoded"] = test["text"].apply(lambda row: func(row))
test["text_len"] = test["text"].apply(len)

# Model based on integer encoding

In [None]:
def pad(row, max_seq_len):
    if len(row) < max_seq_len:
        return row + [0] * (max_seq_len - len(row))
    else:
        return row[:max_seq_len]

train["text_int_encoded_padded"] = train["text_int_encoded"].apply(lambda row: pad(row, max_seq_len))
val["text_int_encoded_padded"] = val["text_int_encoded"].apply(lambda row: pad(row, max_seq_len))
test["text_int_encoded_padded"] = test["text_int_encoded"].apply(lambda row: pad(row, max_seq_len))

In [None]:
x_train_int = np.stack(np.array(train["text_int_encoded_padded"].iloc[i]) for i in range(len(train)))
y_train = train["label"]

x_val_int = np.stack(np.array(val["text_int_encoded_padded"].iloc[i]) for i in range(len(val)))
y_val = val["label"]

x_test_int = np.stack(np.array(test["text_int_encoded_padded"].iloc[i]) for i in range(len(test)))
y_test = test["label"]

In [None]:
def nn_model_int_encoded(seq_len: int):
    kernel= 'normal'
    model= Sequential(name='sequential')
    model.add(Dense(200, input_dim=seq_len, kernel_initializer=kernel, activation='relu', name='dense_1'))
    model.add(Dropout(0.4,name='dropout_0'))
    model.add(Dense(100, kernel_initializer=kernel, activation='relu', name='dense_2'))
    model.add(Dropout(0.4,name='dropout_1'))
    model.add(Dense(50, kernel_initializer=kernel, activation='relu', name='dense_3'))
    model.add(Dropout(0.4,name='dropout_2'))
    model.add(Dense(1, kernel_initializer=kernel, activation='sigmoid', name='dense_4'))
    
    # compile
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    return model   

In [None]:
model_int_encoded = nn_model_int_encoded(seq_len=max_seq_len)
history = model_int_encoded.fit(x_train_int, y_train, 
                   validation_data=(x_val_int, y_val), 
                   batch_size=16,
                   epochs=200)
plt.plot(history.history['accuracy'], label="train_accuracy")
plt.plot(history.history['val_accuracy'], label="val_accuracy")
plt.title('nn_model_int_encoded')
plt.legend()
plt.show()
plt.close()

In [None]:
y_int_predicted = model_int_encoded.predict(x_test_int)
y_int_predicted = (y_int_predicted > 0.5).astype("int32").ravel()
test_acc = accuracy_score(y_test, y_int_predicted)
print("test accuracy: ", test_acc)

# Model based on one-hot encoding

In [None]:
train.head()

In [None]:
to_categorical(train["text_int_encoded_padded"].iloc[0], num_classes=(len(vocabulary)+1)).ravel()

In [None]:
x_train_onehot = np.stack(
    to_categorical(train["text_int_encoded_padded"].iloc[i], num_classes=(len(vocabulary)+1)).ravel() for i in range(len(train))
)
x_val_onehot = np.stack(
    to_categorical(val["text_int_encoded_padded"].iloc[i], num_classes=(len(vocabulary)+1)).ravel() for i in range(len(val))
)
x_test_onehot = np.stack(
    to_categorical(test["text_int_encoded_padded"].iloc[i], num_classes=(len(vocabulary)+1)).ravel() for i in range(len(test))
)

In [None]:
test["text_int_encoded"].apply(len).max()

In [None]:
def nn_model_onehot_encoded(seq_len: int):
    kernel= 'normal'
    model= Sequential(name='sequential')
    model.add(Dense(5000, input_dim=seq_len*(len(vocabulary)+1), kernel_initializer=kernel, activation='relu', name='dense_1'))
    model.add(Dropout(0.5,name='dropout_0'))
    model.add(Dense(2500, kernel_initializer=kernel, activation='relu', name='dense_2'))
    model.add(Dropout(0.5,name='dropout_1'))
    model.add(Dense(1250, kernel_initializer=kernel, activation='relu', name='dense_3'))
    model.add(Dropout(0.5,name='dropout_2'))
    model.add(Dense(1, kernel_initializer=kernel, activation='sigmoid', name='dense_4'))
    
    # compile
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    return model  

model_onehot_encoded = nn_model_onehot_encoded(seq_len=max_seq_len)
es = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True)
history = model_onehot_encoded.fit(x_train_onehot, y_train, 
                   validation_data=(x_val_onehot, y_val), 
                   batch_size=16,
                   epochs=10,
                   callbacks=[es])
plt.plot(history.history['accuracy'], label="train_accuracy")
plt.plot(history.history['val_accuracy'], label="val_accuracy")
plt.title('nn_model_onehot_encoded')
plt.legend()
plt.show()
plt.close()

In [None]:
y_onehot_predicted = model_onehot_encoded.predict(x_test_onehot)
y_onehot_predicted = (y_onehot_predicted > 0.5).astype("int32").ravel()
test_acc = accuracy_score(y_test, y_onehot_predicted)
print("test accuracy: ", test_acc)

# Model based on embeddings

In [None]:
train.head(1)

In [None]:
def nn_model_embedding(seq_len: int):
    kernel= 'normal'
    model= Sequential(name='sequential')
    model.add(Embedding(input_dim=(len(vocabulary)+1), output_dim=15, input_length=seq_len, name='embedding'))
    model.add(Flatten(name="flatten"))
    model.add(Dense(500, kernel_initializer=kernel, activation='relu', name='dense_1'))
    model.add(Dropout(0.5,name='dropout_0'))
    #model.add(Dense(500, kernel_initializer=kernel, activation='relu', name='dense_2'))
    #model.add(Dropout(0.5,name='dropout_1'))
    #model.add(Dense(250, kernel_initializer=kernel, activation='relu', name='dense_3'))
    #model.add(Dropout(0.5,name='dropout_2'))
    model.add(Dense(1, kernel_initializer=kernel, activation='sigmoid', name='dense_4'))
    
    opt = tf.keras.optimizers.Adam(learning_rate=0.0005)
    # compile
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
    plot_model(model, show_shapes= True, to_file= 'nn_embedding.png')
    
    return model  


model_embedding = nn_model_embedding(seq_len=max_seq_len)
num_epochs = 25
es = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=num_epochs, restore_best_weights=True)
history = model_embedding.fit(x_train_int, y_train, 
                   validation_data=(x_val_int, y_val), 
                   batch_size=16,
                   epochs=num_epochs,
                   callbacks=[es])
plt.plot(history.history['accuracy'], label="train_accuracy")
plt.plot(history.history['val_accuracy'], label="val_accuracy")
plt.title('nn_model_embedding')
plt.legend()
plt.show()
plt.close()

In [None]:
y_embedding_predicted = model_embedding.predict(x_test_int)
y_embedding_predicted = (y_embedding_predicted > 0.5).astype("int32").ravel()
test_acc = accuracy_score(y_test, y_embedding_predicted)
print("test accuracy: ", test_acc)

# MC-CNN

In [None]:
CHANNELS = [2, 3, 4, 5]

def mccnn_model(timesteps: int, 
              vocab_size: int) -> Model:

    embedding_size = 15
    
    # the multichannel part
    inputs = list()
    flats = list()
    
    for i, ks in enumerate(CHANNELS):
        input_ = Input(shape=(timesteps, 1), name=f'input_{i}')
        embedding = Embedding(vocab_size, embedding_size, name=f'embedding_{i}') (input_)
        
        conv_one = Conv2D(filters=256, kernel_size=(ks, embedding_size),
                         padding='same', activation='relu', name=f'conv_one_{i}')(embedding)
        drop_one = Dropout(0.5, name=f'dropout_one_{i}')(conv_one)
        pool_one = MaxPooling2D(pool_size=(timesteps-ks+1, 1), name=f'pooling_one_{i}')(drop_one)
        
        flat = Flatten(name=f'flatten_{i}')(pool_one)
        
        inputs.append(input_)
        flats.append(flat)
        
    merged= concatenate(flats, name='merged')
    # The dense layers part
    dense1 = Dense(512, activation='relu', name='dense_1')(merged)
    drop1 = Dropout(0.5, name='dropout_1')(dense1)
    dense2 = Dense(256, activation='relu', name='dense_2')(drop1)
    drop2 = Dropout(0.5, name='dropout_2')(dense2)
    dense3 = Dense(1, activation='sigmoid', name='dense_3') (drop2)
    
    # build the model
    model = Model(inputs=inputs, outputs=dense3)
    
    # compile
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    #model.sumamry()
    plot_model(model, show_shapes=True, to_file='multichannel_cnn.png')
    return model
################################################
model_mccnn = mccnn_model(timesteps=max_seq_len, vocab_size=(len(vocabulary)+1))
num_epochs = 25
es = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=num_epochs, restore_best_weights=True)
history = model_mccnn.fit([x_train_int]*len(CHANNELS), y_train, 
                   validation_data=([x_val_int]*len(CHANNELS), y_val), 
                   batch_size=16,
                   epochs=num_epochs,
                   callbacks=[es])
plt.plot(history.history['accuracy'], label="train_accuracy")
plt.plot(history.history['val_accuracy'], label="val_accuracy")
plt.title('nn_model_embedding')
plt.legend()
plt.show()
plt.close()

In [None]:
y_mccnn_predicted = model_mccnn.predict([x_test_int]*len(CHANNELS))
y_mccnn_predicted = (y_mccnn_predicted > 0.5).astype("int32").ravel()
test_acc = accuracy_score(y_test, y_mccnn_predicted)
print("test accuracy: ", test_acc)