# Prototyping model2regex

In [1]:
import sys
!{sys.executable} -m pip install -r requirements.txt



In [2]:
import torch
from torch import nn, optim
from torch.distributions import Categorical
from random import Random
from datetime import datetime
from torch.utils.data import Dataset, DataLoader
from itertools import chain
import torch.nn.functional as F
import string

We going to try to learn a simple DGA called bajori

First we are generating a dataset of of domains

In [3]:
from dga import banjori, generate_dataset, simple_dga

domains: list[str] = generate_dataset(banjori, 'earnestnessbiophysicalohax.com', 2**16)
print(domains[:5])

['kwtoestnessbiophysicalohax.com', 'rvcxestnessbiophysicalohax.com', 'hjbtestnessbiophysicalohax.com', 'txmoestnessbiophysicalohax.com', 'agekestnessbiophysicalohax.com']


We are going to generate a pytorch [Dataset](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html) to help us doing the batching and data shuffling 

In [4]:
class Domains(Dataset):

    def __init__(self, data: list[str]):
        self.data = list(map(lambda d: '_'+d, data))
        self.chars = sorted(list(set(chain(*self.data))))
        self.vocabSize = len(self.chars) + 1
        self.max_size = len(max(self.data, key=lambda d: len(d)))
        self.char2ind = {ch: i for i, ch in enumerate(self.chars, start=1)}
        self.ind2char = {i: ch for i, ch in enumerate(self.chars, start=1)}

    def __len__(self):
        return len(self.data)

    def isEndChar(self, ind):
        return ind == 0

    def charTensor(self, _input):
        return torch.tensor([[self.char2ind[c] for c in _input]]).permute(1,0)

    def __getitem__(self, idx: int):
        item = torch.tensor([self.char2ind[c] for c in self.data[idx]])
        # we need tensors of same size, so if any domain has a different size we then pad it with 0 which will be our "end char"
        item = F.pad(item, (0,self.max_size - len(item)), "constant", 0)
        return (item, F.pad(item[1:], (0,1), "constant", 0))


In [5]:
dataset = Domains(domains)
x,y = next(iter(DataLoader(dataset, batch_size=64, shuffle=True)))
print("x:\n\t", x.shape)
print("y:\n\t", y.shape)
print(dataset.data[0])

x:
	 torch.Size([64, 31])
y:
	 torch.Size([64, 31])
_kwtoestnessbiophysicalohax.com


Following the dataset we define a Model for learning the structure of our DGA

In [6]:
from torch import nn
class Model(nn.Module):
    def __init__(self, vocabSize, emb, size, nlayers):
        super(Model, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocabSize,
                                      embedding_dim=emb)
        self.rnn = nn.GRU(input_size=emb, hidden_size=size,
                          num_layers=nlayers)
        self.decoder = nn.Linear(size, vocabSize)

    def forward(self, input_seq, hidden_state):
        embedding = self.embedding(input_seq)
        output, hidden_state = self.rnn(embedding, hidden_state)
        output = self.decoder(output)
        return output, hidden_state.detach()

In [7]:
# Trying to understand how the embedding put out the permuted input
dataset = Domains(domains)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
(x,y) = next(iter(dataloader))
print(x.permute(1,0))
embedding = nn.Embedding(num_embeddings=dataset.vocabSize, embedding_dim=64)
emb = embedding(x.permute(1,0))
emb.shape

tensor([[ 2,  2,  2,  ...,  2,  2,  2],
        [21,  4, 28,  ..., 23,  8, 10],
        [ 9, 26, 20,  ..., 13,  8, 10],
        ...,
        [ 5,  5,  5,  ...,  5,  5,  5],
        [17, 17, 17,  ..., 17, 17, 17],
        [15, 15, 15,  ..., 15, 15, 15]])


torch.Size([31, 64, 64])

In [22]:
from typing import List, Tuple, NamedTuple
class TokenProbs(NamedTuple):
    char: str
    prob: float
    
class Trainer:
    def __init__(self, dataset: Domains, **kwargs):
        self.dataset = dataset
        self.hidden_size = kwargs.get('hidden_size', 128)
        self.num_layers = kwargs.get('num_layers', 1)
        self.embed_dim = kwargs.get('embed_dim', 64)
        self.device = kwargs.get('device', 'cuda:0')
        self.model = Model(dataset.vocabSize, self.embed_dim, self.hidden_size, self.num_layers)

    def predict_next_token(self, starter: str):
        chart = self.dataset.charTensor(starter)
        output, state = self.model(chart.to(self.device), None)
        output = F.softmax(torch.squeeze(output[-1, :]), dim=0)
        dist = Categorical(output)
        index = dist.sample()
        return index.item()

    def predict(self, starter: str):
        for _ in range(100):
            ind = self.predict_next_token(starter)
            if dataset.isEndChar(ind):
                starter += "<END>"
                break
            starter += self.dataset.ind2char[ind]
        return starter

    def token_probs(self, starter: str) -> List[TokenProbs]:
        self.model.eval()
        chart = self.dataset.charTensor(starter)
        output, state = self.model(chart.to(device), None)
        output = F.softmax(torch.squeeze(output[-1, :]), dim=0)
        return [TokenProbs(self.dataset.ind2char.get(idx, "<END>"), prob) for idx, prob in enumerate(output, 0)]

    def train(self):
        criterion = nn.CrossEntropyLoss(reduction="mean")
        dataset = self.dataset
        device = self.device
        dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
        model = self.model
        model.to(device)
        model.train()
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        h0 = None
        for epoch in range(100):
            for batch, (x,y) in enumerate(dataloader):
                optimizer.zero_grad()
                if h0 is not None:
                    h0 = h0.to(device)
                output, h0 = model(x.permute(1,0).to(device), h0)
                loss = criterion(output.permute(1,2,0), y.to(device))
                loss.backward()
                optimizer.step()
            print()
            print("----------------------------------------------")
            print({'epoch': epoch, 'batch': batch, 'loss': loss.item()})
            print(self.predict(f"_a"))
            print("----------------------------------------------")
            model.train()

In [23]:
HIDDEN_SIZE = 128
NLAYERS = 1
EMBEDDING_DIM = 64
trainer = Trainer(dataset, hidden_size=HIDDEN_SIZE, num_layers=NLAYERS, embed_dim=EMBEDDING_DIM)
trainer.train()


----------------------------------------------
{'epoch': 0, 'batch': 4095, 'loss': 0.3924955129623413}
_aamestnessbiophysicalohax.com<END>
----------------------------------------------

----------------------------------------------
{'epoch': 1, 'batch': 4095, 'loss': 0.39380553364753723}
_aqcestnessbiophysicalohax.com<END>
----------------------------------------------

----------------------------------------------
{'epoch': 2, 'batch': 4095, 'loss': 0.39644837379455566}
_acdestnessbiophysicalohax.com<END>
----------------------------------------------

----------------------------------------------
{'epoch': 3, 'batch': 4095, 'loss': 0.39595112204551697}
_aesgestnessbiophysicalohax.com<END>
----------------------------------------------

----------------------------------------------
{'epoch': 4, 'batch': 4095, 'loss': 0.3978240191936493}
_akpcestnessbiophysicalohax.com<END>
----------------------------------------------

----------------------------------------------
{'epoch': 5,

In [24]:
state = None
device = "cuda:0"
start = '_a'
chart = trainer.dataset.charTensor(start)
output, state = trainer.model(chart.to(device), state)
output = F.softmax(torch.squeeze(output[-1, :]), dim=0)
print("showing probabilities for start string: ", start)
for idx, prob in enumerate(output, 0):
    print(f"{trainer.dataset.ind2char.get(idx, '<END>')}: {prob:.2%}")
dist = Categorical(output)
index = dist.sample()
ind = index.item()
print("Sampled next letter: ", trainer.dataset.ind2char.get(ind, '<END>'))

showing probabilities for start string:  _a
<END>: 0.00%
.: 0.00%
_: 0.00%
a: 0.38%
b: 0.00%
c: 1.53%
d: 0.00%
e: 62.90%
f: 0.00%
g: 1.61%
h: 0.00%
i: 0.14%
j: 0.00%
k: 0.52%
l: 0.00%
m: 0.33%
n: 0.00%
o: 27.99%
p: 0.00%
q: 0.59%
r: 0.00%
s: 1.81%
t: 0.00%
u: 0.84%
v: 0.00%
w: 0.98%
x: 0.00%
y: 0.36%
z: 0.00%
Sampled next letter:  e


In [14]:
from functools import singledispatchmethod
from typing import Set, Sequence, Self
from collections import deque, defaultdict
from itertools import product
from string import ascii_letters
class RegexAtom:
    def __init__(self, chars: Sequence[str], match_all=False):
        self._end = "<END>" in chars
        self._characters: Set[str] = set(filter(lambda c: "<END>" != c, chars))
        self.match_all = match_all

    @classmethod
    def all_match(cls, vocab: Sequence[str]):
        return cls(vocab ,True)

    @property
    def chars(self):
        return list(self._characters)

    def has_end(self):
        return self._end

    def __add__(self, other: object):
        if isinstance(other, RegexAtom):
            self._characters.add(object._characters)
        elif isinstance(other, str):
            self._characters.add(other)
        else:
            raise NotImplementedError("you can only add other Regex-Atoms or strings.")

    def __str__(self):
        if self.match_all:
            return '.'
        if self._end and not self._characters:
            return "$"
        return f"[{''.join(self._characters)}]{'$' if self._end else ''}"
    def __repr__(self):
            return f"<RegexAtom: match_all={self.match_all}, token={str(self)}>"

class RegexBuilder:
    def __init__(self, trainer: Trainer, start="_", threshold=0.5):
        self.start = start
        self.trainer = trainer
        self.threshold = threshold
        self.atoms: list[RegexAtom] = []
        self.current_position = 0

    def build_next_position(self, starter):
        probs = self.trainer.token_probs(starter)
        good_tokens = list(map(lambda t: t[0] ,filter(lambda token: token.prob > self.threshold, probs)))
        if good_tokens:
            self.atoms.append(RegexAtom(good_tokens))
        else:
            self.atoms.append(RegexAtom.all_match(self.trainer.dataset.chars))

    def build(self):
        starter = self.start
        next_token = -1
        while next_token != 0:
            self.build_next_position(starter)
            next_token = self.trainer.predict_next_token(starter) 
            starter += trainer.dataset.ind2char.get(next_token, '<END>')
        return ''.join(str(a) for a in self.atoms)


In [21]:
builder = RegexBuilder(trainer)
atoms = builder.build()
atoms

'.[s][e][n][e][s][s][b][i][o][p][h][y][s][i][c][a][l][o][h][a][x][.][c][o][m]$'

In [13]:
import re
detect = re.compile(atoms)
correct = 0
total = 0
for domain in domains:
    if detect.match(domain):
        correct += 1
    total += 1
print(f"detected {correct} out of {total} DGAs")

detected 0 out of 65536 DGAs
