In [8]:
import os       # os.path.exists
import math     # math.log, math.exp
import random   # random.seed, random.choices, random.gauss, random.shuffle
random.seed(42) # Let there be order among chaos

# Let there be an input dataset `docs`: list[str] of documents (e.g. a dataset of logs)
DATA_PATH = '/content/logs_5000.txt'

docs = [l.strip() for l in open(DATA_PATH).read().split('\n') if l.strip()]
random.shuffle(docs)
print(f"num docs: {len(docs)}")

num docs: 5000


In [9]:
# Let there be a Tokenizer to translate strings to discrete symbols and back
uchars = sorted(set(''.join(docs))) # unique characters in the dataset become token ids 0..n-1
BOS = len(uchars) # token id for the special Beginning of Sequence (BOS) token
vocab_size = len(uchars) + 1 # total number of unique tokens, +1 is for BOS
print(f"vocab size: {vocab_size}")


vocab size: 42


In [10]:

# Let there be Autograd, to recursively apply the chain rule through a computation graph
class Value:
    __slots__ = ('data', 'grad', '_children', '_local_grads') # Python optimization for memory usage

    def __init__(self, data, children=(), local_grads=()):
        self.data = data                # scalar value of this node calculated during forward pass
        self.grad = 0                   # derivative of the loss w.r.t. this node, calculated in backward pass
        self._children = children       # children of this node in the computation graph
        self._local_grads = local_grads # local derivative of this node w.r.t. its children

    def __add__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        return Value(self.data + other.data, (self, other), (1, 1))

    def __mul__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        return Value(self.data * other.data, (self, other), (other.data, self.data))

    def __pow__(self, other): return Value(self.data**other, (self,), (other * self.data**(other-1),))
    def log(self): return Value(math.log(self.data), (self,), (1/self.data,))
    def exp(self): return Value(math.exp(self.data), (self,), (math.exp(self.data),))
    def relu(self): return Value(max(0, self.data), (self,), (float(self.data > 0),))
    def __neg__(self): return self * -1
    def __radd__(self, other): return self + other
    def __sub__(self, other): return self + (-other)
    def __rsub__(self, other): return other + (-self)
    def __rmul__(self, other): return self * other
    def __truediv__(self, other): return self * other**-1
    def __rtruediv__(self, other): return other * self**-1

    def backward(self):
        topo = []
        visited = set()
        def build_topo(v):
            if v not in visited:
                visited.add(v)
                for child in v._children:
                    build_topo(child)
                topo.append(v)
        build_topo(self)
        self.grad = 1
        for v in reversed(topo):
            for child, local_grad in zip(v._children, v._local_grads):
                child.grad += local_grad * v.grad

In [11]:
# Initialize the parameters, to store the knowledge of the model.
n_embd = 24     # embedding dimension
n_head = 4      # number of attention heads
n_layer = 1     # number of layers
block_size = 32 # maximum sequence length
head_dim = n_embd // n_head # dimension of each head
matrix = lambda nout, nin, std=0.08: [[Value(random.gauss(0, std)) for _ in range(nin)] for _ in range(nout)]
state_dict = {'wte': matrix(vocab_size, n_embd), 'wpe': matrix(block_size, n_embd), 'lm_head': matrix(vocab_size, n_embd)}
for i in range(n_layer):
    state_dict[f'layer{i}.attn_wq'] = matrix(n_embd, n_embd)
    state_dict[f'layer{i}.attn_wk'] = matrix(n_embd, n_embd)
    state_dict[f'layer{i}.attn_wv'] = matrix(n_embd, n_embd)
    state_dict[f'layer{i}.attn_wo'] = matrix(n_embd, n_embd)
    state_dict[f'layer{i}.mlp_fc1'] = matrix(4 * n_embd, n_embd)
    state_dict[f'layer{i}.mlp_fc2'] = matrix(n_embd, 4 * n_embd)
params = [p for mat in state_dict.values() for row in mat for p in row] # flatten params into a single list[Value]
print(f"num params: {len(params)}")

num params: 9696


In [12]:
# Define the model architecture: a stateless function mapping token sequence and parameters to logits over what comes next.
# Follow GPT-2, blessed among the GPTs, with minor differences: layernorm -> rmsnorm, no biases, GeLU -> ReLU
def linear(x, w):
    return [sum(wi * xi for wi, xi in zip(wo, x)) for wo in w]

def softmax(logits):
    max_val = max(val.data for val in logits)
    exps = [(val - max_val).exp() for val in logits]
    total = sum(exps)
    return [e / total for e in exps]

def rmsnorm(x):
    ms = sum(xi * xi for xi in x) / len(x)
    scale = (ms + 1e-5) ** -0.5
    return [xi * scale for xi in x]

def gpt(token_id, pos_id, keys, values):
    tok_emb = state_dict['wte'][token_id] # token embedding
    pos_emb = state_dict['wpe'][pos_id] # position embedding
    x = [t + p for t, p in zip(tok_emb, pos_emb)] # joint token and position embedding
    x = rmsnorm(x)

    for li in range(n_layer):
        # 1) Multi-head attention block
        x_residual = x
        x = rmsnorm(x)
        q = linear(x, state_dict[f'layer{li}.attn_wq'])
        k = linear(x, state_dict[f'layer{li}.attn_wk'])
        v = linear(x, state_dict[f'layer{li}.attn_wv'])
        keys[li].append(k)
        values[li].append(v)
        x_attn = []
        for h in range(n_head):
            hs = h * head_dim
            q_h = q[hs:hs+head_dim]
            k_h = [ki[hs:hs+head_dim] for ki in keys[li]]
            v_h = [vi[hs:hs+head_dim] for vi in values[li]]
            attn_logits = [sum(q_h[j] * k_h[t][j] for j in range(head_dim)) / head_dim**0.5 for t in range(len(k_h))]
            attn_weights = softmax(attn_logits)
            head_out = [sum(attn_weights[t] * v_h[t][j] for t in range(len(v_h))) for j in range(head_dim)]
            x_attn.extend(head_out)
        x = linear(x_attn, state_dict[f'layer{li}.attn_wo'])
        x = [a + b for a, b in zip(x, x_residual)]
        # 2) MLP block
        x_residual = x
        x = rmsnorm(x)
        x = linear(x, state_dict[f'layer{li}.mlp_fc1'])
        x = [xi.relu() for xi in x]
        x = linear(x, state_dict[f'layer{li}.mlp_fc2'])
        x = [a + b for a, b in zip(x, x_residual)]

    logits = linear(x, state_dict['lm_head'])
    return logits

In [13]:
# Let there be Adam, the blessed optimizer and its buffers
learning_rate, beta1, beta2, eps_adam = 0.01, 0.85, 0.99, 1e-8
m = [0.0] * len(params) # first moment buffer
v = [0.0] * len(params) # second moment buffer


In [15]:
# Repeat in sequence
num_steps = 1000 # number of training steps
for step in range(num_steps):

    # Take single document, tokenize it, surround it with BOS special token on both sides
    doc = docs[step % len(docs)]
    tokens = [BOS] + [uchars.index(ch) for ch in doc] + [BOS]
    n = min(block_size, len(tokens) - 1)

    # Forward the token sequence through the model, building up the computation graph all the way to the loss.
    keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)]
    losses = []
    for pos_id in range(n):
        token_id, target_id = tokens[pos_id], tokens[pos_id + 1]
        logits = gpt(token_id, pos_id, keys, values)
        probs = softmax(logits)
        loss_t = -probs[target_id].log()
        losses.append(loss_t)
    loss = (1 / n) * sum(losses) # final average loss over the document sequence. May yours be low.

    # Backward the loss, calculating the gradients with respect to all model parameters.
    loss.backward()

    # Adam optimizer update: update the model parameters based on the corresponding gradients.
    lr_t = learning_rate * (1 - step / num_steps) # linear learning rate decay
    for i, p in enumerate(params):
        m[i] = beta1 * m[i] + (1 - beta1) * p.grad
        v[i] = beta2 * v[i] + (1 - beta2) * p.grad ** 2
        m_hat = m[i] / (1 - beta1 ** (step + 1))
        v_hat = v[i] / (1 - beta2 ** (step + 1))
        p.data -= lr_t * m_hat / (v_hat ** 0.5 + eps_adam)
        p.grad = 0

    print(f"step {step+1:4d} / {num_steps:4d} | loss {loss.data:.4f}")


step    1 / 1000 | loss 2.2478
step    2 / 1000 | loss 2.2138
step    3 / 1000 | loss 2.6572
step    4 / 1000 | loss 2.9876
step    5 / 1000 | loss 2.6668
step    6 / 1000 | loss 2.6081
step    7 / 1000 | loss 3.0399
step    8 / 1000 | loss 2.6982
step    9 / 1000 | loss 3.4015
step   10 / 1000 | loss 2.9190
step   11 / 1000 | loss 2.4156
step   12 / 1000 | loss 2.9301
step   13 / 1000 | loss 3.1057
step   14 / 1000 | loss 2.4994
step   15 / 1000 | loss 3.3957
step   16 / 1000 | loss 2.2030
step   17 / 1000 | loss 2.7041
step   18 / 1000 | loss 2.6138
step   19 / 1000 | loss 3.1638
step   20 / 1000 | loss 2.7406
step   21 / 1000 | loss 2.0568
step   22 / 1000 | loss 3.1725
step   23 / 1000 | loss 2.8709
step   24 / 1000 | loss 3.0415
step   25 / 1000 | loss 3.1318
step   26 / 1000 | loss 2.9678
step   27 / 1000 | loss 3.0054
step   28 / 1000 | loss 3.1860
step   29 / 1000 | loss 2.7888
step   30 / 1000 | loss 2.6011
step   31 / 1000 | loss 2.5457
step   32 / 1000 | loss 2.5364
step   3

In [92]:
def classify_log_with_gpt(log_text):
    # Maximum length for the longest label we expect to generate (e.g., "WARNING" is 7 chars).
    max_label_length = 7
    # The suffix we append to the log text
    prompt_suffix = " -> "

    # Calculate the maximum allowed length for the log_text part of the prompt
    # This ensures that (truncated_log_text + prompt_suffix + generated_label) fits within block_size.
    max_effective_prompt_length = block_size - max_label_length
    max_log_text_length = max_effective_prompt_length - len(prompt_suffix)

    # Ensure max_log_text_length is not negative in case block_size is very small
    if max_log_text_length < 0:
        max_log_text_length = 0

    # Truncate the log_text if it's too long to leave space for generation
    processed_log_text = log_text.strip()
    if len(processed_log_text) > max_log_text_length:
        processed_log_text = processed_log_text[:max_log_text_length]

    prompt = processed_log_text + prompt_suffix

    # Reset KV cache
    keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)]

    # Prime the model with the prepared prompt
    priming_token_ids = []
    for ch in prompt:
        if ch in uchars:
            priming_token_ids.append(uchars.index(ch))
        # Unknown characters are skipped. Consider adding a special UNK token if needed.

    # Initialize the token_id for the next gpt call (for generation)
    # and the current position in the sequence
    current_token_id_for_generation = BOS # Default to BOS if no priming tokens
    current_pos_for_generation = 0

    if priming_token_ids:
        for pos_id_prime, token_id_in_prompt in enumerate(priming_token_ids):
            # Feed each token from the prompt to the GPT model
            gpt(token_id_in_prompt, pos_id_prime, keys, values)
            current_pos_for_generation = pos_id_prime + 1
            current_token_id_for_generation = token_id_in_prompt # Last token primed becomes input for first generation step

    # Generate label characters
    generated = ""
    # Generation starts from the position immediately after the prompt
    # and continues up to the block_size limit.
    for pos_id_gen in range(current_pos_for_generation, block_size):
        # Get logits for the next token using the last generated/primed token and current position
        logits = gpt(current_token_id_for_generation, pos_id_gen, keys, values)
        probs = softmax(logits)

        # Select the token with the highest probability (argmax) for classification
        next_token_id = max(range(vocab_size), key=lambda i: probs[i].data)

        # If the model predicts the Beginning of Sequence (BOS) token, it signals end of generation.
        if next_token_id == BOS:
            break

        # Convert token ID back to character and append to generated string
        next_char = uchars[next_token_id]
        generated += next_char
        current_token_id_for_generation = next_token_id # Update for the next generation step


    if "WARNING" in generated:
        return "WARNING"

    if "ERROR" in generated:
        return "ERROR"
    if "INFO" in generated:
        return "INFO"

    # If no keyword is found after generating up to block_size, return "UNCERTAIN"
    return "UNCERTAIN"

In [93]:
print(classify_log_with_gpt("audit-trace cache hit scheduled"))


INFO


In [94]:
print(classify_log_with_gpt("Syntax Error"))


ERROR


In [95]:
print(classify_log_with_gpt("Warning Limit Hit "))




**Gradio UI**

In [96]:
import gradio as gr

def classify_log_interface(log_input):
    return classify_log_with_gpt(log_input)

custom_css = """
body { font-family: 'Segoe UI', 'Roboto', 'Helvetica Neue', Arial, sans-serif; background-color: #f0f2f5; }
.gradio-container { max-width: 900px; margin: 20px auto; padding: 20px; border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.1); background-color: white; }
.gr-button { background-color: #1a73e8; color: white; border-radius: 8px; padding: 10px 20px; font-weight: bold; }
.gr-button:hover { background-color: #155cb8; }
.gr-textbox, .gr-textarea { border-radius: 8px; border: 1px solid #ccc; padding: 10px; box-shadow: inset 0 1px 3px rgba(0,0,0,0.05); }
.gr-label { font-weight: 600; color: #333; margin-bottom: 5px; }
h1, h2, h3, h4, h5, h6 { color: #202124; }
"""

iface = gr.Interface(
    fn=classify_log_interface,
    inputs=gr.Textbox(lines=2, placeholder="Enter log message here..."),
    outputs=gr.Textbox(label="Predicted Label"),
    title="Log Classifier Micro GPT",
    description="Enter a log message to classify it as ERROR, WARNING, INFO, or UNCERTAIN.",
    live=False, # Changed to False for manual submission
    allow_flagging="never",
    theme="soft", # Using a modern Gradio theme
    css=custom_css
)

# Set the footer after the interface is created
iface.footer = "<p>Created by - Shubham Murtadak (AI/ML Engineer)</p>"

iface.launch(debug=True)



It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://fb493364b2672bc528.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://fb493364b2672bc528.gradio.live


