<a href="https://colab.research.google.com/github/mmostafahareb/credential_finder/blob/main/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Imports:

In [4]:
import os
import zipfile
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import GlobalAveragePooling1D


Data Extracting and Labeling

In [5]:
zip_path = "projects.zip"
extract_dir = "Projects"
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(".")


clean_dir = os.path.join(extract_dir, 'Clean')
dirty_dir = os.path.join(extract_dir, 'Dirty')

file_paths = []
labels = []

for filename in os.listdir(clean_dir):
    file_paths.append(os.path.join(clean_dir, filename))
    labels.append('clean')

for filename in os.listdir(dirty_dir):
    file_paths.append(os.path.join(dirty_dir, filename))
    labels.append('dirty')

print(file_paths)
print(labels)


['Projects/Clean/main2.js', 'Projects/Clean/detect_make.py', 'Projects/Clean/Styled.tsx', 'Projects/Clean/index-1 (3).js', 'Projects/Clean/views.py', 'Projects/Clean/insertion_sort.py', 'Projects/Clean/acronym_generator.py', 'Projects/Clean/ClientHandler.java', 'Projects/Clean/Text_layout.js', 'Projects/Clean/Templatable.js', 'Projects/Clean/parse_pom.js', 'Projects/Clean/TaskRunner.js', 'Projects/Clean/indexing.py', 'Projects/Clean/euclid_gcd.py', 'Projects/Clean/SimilarityVSM.py', 'Projects/Clean/audio.py', 'Projects/Clean/Tablet.js', 'Projects/Clean/ToolbarDroppable.js', 'Projects/Clean/gui (2).py', 'Projects/Clean/InvertBinaryTree.java', 'Projects/Clean/stream.py', 'Projects/Clean/processing.py', 'Projects/Clean/admin.js', 'Projects/Clean/BinarySearch.java', 'Projects/Clean/TextArea_layout.js', 'Projects/Clean/test.py', 'Projects/Clean/products.js', 'Projects/Clean/mergeIntervals.js', 'Projects/Clean/AStarAlgorithm.java', 'Projects/Clean/HighestPowerOf2.java', 'Projects/Clean/array

Data Preprocessing:

In [6]:
file_contents = []
for file_path in file_paths:
    with open(file_path, 'r',errors='ignore') as file:
        file_contents.append(file.read())

X_train, X_test, y_train, y_test = train_test_split(file_contents, labels, test_size=0.4, random_state=42)
vectorizer = TfidfVectorizer()
X_train_vectorized = vectorizer.fit_transform(X_train)
X_test_vectorized = vectorizer.transform(X_test)
print(X_train_vectorized.shape)
print(X_test_vectorized.shape)
encoder = LabelEncoder()
y_train_encoded = encoder.fit_transform(y_train)
y_test_encoded = encoder.transform(y_test)


(273, 13081)
(183, 13081)


Transformer Model Building:



In [7]:
class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)
        query = self.separate_heads(query, batch_size)
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))
        output = self.combine_heads(concat_attention)
        return output

class TransformerClassifier(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerClassifier, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.global_avg_pool = GlobalAveragePooling1D()   

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        output = self.layernorm2(out1 + ffn_output)
        output = self.global_avg_pool(output) 
        return output
embed_dim = 16
num_heads = 2
ff_dim = 16
maxlen = 13801 

inputs = layers.Input(shape=(maxlen,))
embedding_layer = layers.Embedding(input_dim=10000, output_dim=embed_dim)
transformer_block = TransformerClassifier(embed_dim, num_heads, ff_dim)
x = inputs
x = embedding_layer(x)
x = transformer_block(x)
x = layers.Dense(20, activation="relu")(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(2, activation="softmax")(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)
optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        probabilities = tf.nn.sigmoid(logits)
        y_reshaped = tf.reshape(y, probabilities.shape)  # Reshape y to match the shape of probabilities
        loss_value = loss_fn(y_reshaped, probabilities)
    grads = tape.gradient(loss_value, model.trainable_weights)
    return grads, loss_value

batch_size = 2  
accumulation_steps = 1 
# Initialize a list to accumulate gradients
accumulated_grads = [tf.zeros_like(w) for w in model.trainable_weights]

for epoch in range(30):
    print(f"Start of epoch {epoch + 1}")
    for step, (x_batch_train, y_batch_train) in enumerate(zip(X_train_vectorized.toarray(), y_train_encoded)):
        grads, loss_value = train_step(x_batch_train, y_batch_train)
        accumulated_grads = [acc_grad + grad for acc_grad, grad in zip(accumulated_grads, grads)]
        if (step + 1) % accumulation_steps == 0:
            optimizer.apply_gradients(zip(accumulated_grads, model.trainable_weights))
            accumulated_grads = [tf.zeros_like(w) for w in model.trainable_weights]

        print(f"Training loss (for one batch) at step {step}: {float(loss_value)}")


Start of epoch 1


InternalError: ignored

Model Evaluation and Saving:


In [None]:
loss, accuracy = model.evaluate(X_test_vectorized, y_test_encoded)

print("Loss: ", loss)
print("Accuracy: ", accuracy)
model.save("model.h5")

print("Saved model to disk")
