## Pre-processing

In [53]:
def read_data(path):
    with open(path, "r") as data:
        lines = data.read().splitlines()
    return lines

def fixed_length_chunks(string, length):
    chunks = list( (string[0+i:length+i] for i in range(0, len(string), length)) )
    last_chunk_length_diff = length - len(chunks[-1])
    # force last chunk to be fixed length by adding a part from the penultimate chunk
    if len(chunks) > 1 and last_chunk_length_diff != 0:
        chunks[-1] = (chunks[-2] + chunks[-1])[-length:]
    return chunks

In [54]:
# Methods for training data

def remove_duplicates(chunks):
    """Removes duplicate chunks (use only for training data)."""
    duplicates = set()
    output = []
    for chunk in chunks:
        if chunk not in duplicates:
            duplicates.add(chunk)
            output.append(chunk)
    return output

def process_train_sequences(sequences, chunk_length):
    chunked_sequences = [fixed_length_chunks(seq, chunk_length) for seq in sequences]
    chunks = [chunk for chunks in chunked_sequences for chunk in chunks]
    return remove_duplicates(chunks)    

def write_train_data(chunks, out_dir):
    with open(out_dir, "w") as train:
        train.write('\n'.join(chunks))

In [55]:
# Methods for test data

def read_test_and_labels(path_no_ext):
    test = read_data(path_no_ext + ".test")
    labels = read_data(path_no_ext + ".labels")
    return test, labels

def process_test_sequences(labeled_sequences, chunk_length):
    chunked_sequences = [list( map(lambda x: (x, label), 
                                   fixed_length_chunks(seq, chunk_length)) 
                             )
                         for seq, label in labeled_sequences]
    chunks = [chunk for chunks in chunked_sequences for chunk in chunks]
    return chunks

def write_test_data(labeled_chunks, out_dir_no_ext):
    chunks = [chunk for chunk, _ in labeled_chunks]
    lbls = [label for _, label in labeled_chunks]
    with open(out_dir_no_ext + ".test", "w") as test, open(out_dir_no_ext + ".labels", "w") as labels:
        test.write('\n'.join(chunks))
        labels.write('\n'.join(lbls))

In [56]:
# Constants
CERT = "negative-selection/syscalls/snd-cert/"
UNM = "negative-selection/syscalls/snd-unm/"
OUT = "syscalls-classification/"
CHUNK_LENGTH = 10

In [57]:
def preprocess_train_data(chunk_length):
    # snd-cert
    train_cert = read_data(CERT + "snd-cert.train")
    chunks_cert = process_train_sequences(train_cert, chunk_length)
    write_train_data(chunks_cert, OUT + "snd-cert/snd-cert.train")
    # snd-unm
    train_unm = read_data(UNM+ "snd-unm.train")
    chunks_unm = process_train_sequences(train_unm, chunk_length)
    write_train_data(chunks_unm, OUT + "snd-unm/snd-unm.train")
    
def preprocess_test_data(chunk_length):
    for i in range(1, 4):
        # snd-cert
        test_cert, labels_cert = read_test_and_labels(CERT + f"snd-cert.{i}")
        labeled_chunks_cert = process_test_sequences(zip(test_cert, labels_cert), chunk_length)
        write_test_data(labeled_chunks_cert, OUT + f"snd-cert/snd-cert.{i}")
        # snd-unm
        test_unm, labels_unm = read_test_and_labels(UNM + f"snd-unm.{i}")
        labeled_chunks_unm = process_test_sequences(zip(test_unm, labels_unm), chunk_length)
        write_test_data(labeled_chunks_unm, OUT + f"snd-unm/snd-unm.{i}")

In [58]:
preprocess_train_data(CHUNK_LENGTH)

In [59]:
preprocess_test_data(CHUNK_LENGTH)

## Classification

In [63]:
# basic example
cert_data_path = OUT + "snd-cert/"
jar = f"-jar negative-selection/negsel2.jar"
alpha = f"-alphabet file://{CERT}snd-cert.alpha"
self = f"-self {cert_data_path}snd-cert.train"
params = "-n 10 -r 4 -c -l"
test = f"{cert_data_path}snd-cert.1.test"

output = !java {jar} {alpha} {self} {params} < {test}

In [79]:
# some values are 'nan'
counts = [float(x) for x in output]
np.nanmean(counts)

2.0716324717054313