In [1]:
import numpy as np

import asm2vec.asm
import asm2vec.parse
import asm2vec.model
import random
import os

In [2]:
TRAINDATA_DIR = '/bin/'
TESTDATA_TEST_DIR = 'testcase/coreutils_Ori/'
TESTDATA_CONTROL_DIR = 'testcase/coreutils_O2/'
TARGET_FUNCTION = '<main>'
RANK_TOP = 100

jmp_op = [
    'jmp', 'ja', 'jae', 'jb', 'jbe', 'jc', 'jcxz', 'jecxz', 'jrcxz', 'je', 'jg', 'jge', 'jl', 'jle', 'jna',
    'jnae', 'jnb', 'jnbe', 'jnc', 'jne', 'jng', 'jnge', 'jnl', 'jnle', 'jno', 'jnp', 'jns', 'jnz', 'jo', 'jp',
    'jpe', 'jpo', 'js', 'jz'
]

In [3]:
def cosine_similarity(v1, v2):
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

In [4]:
##############################################
# check_filetype(str)
# parameter: a binary file with relative or absolute path
# return: boolean
##############################################
def check_filetype(f):
    targets = ['ELF']
    for target in targets:
        if target in os.popen('file {}'.format(f)).read():
            return True
    return False

In [5]:
##############################################
# objdump(str)
# parameter: a binary file with relative or absolute path
# return: assembly disassembled by objdump
##############################################
def objdump(f):
    os.system('objdump -M intel -d {} -j .text --no-show-raw-insn > tmp'.format(f))
    asm = open('tmp').read()
    os.system('rm tmp')
    return asm

In [6]:
##############################################
# parse_objdump(str, str)
# parameter: binary disassembled by objdump, the function name containing the specific string
# return: a list of functions' instructions
##############################################
def parse_objdump(objdump_asm, target=''):
    funcs = []
    instructions = []
    start = False
    for asm in objdump_asm.split('\n'):
        if len(asm) and asm[0] == '0':
            if target and target not in asm:
                start = False
            else:
                start = True
                
            if len(instructions) > 5:
                for ins in instructions:
                    if len(ins) > 1 and ins[1] in jmp_op:
                        address = ins[2]
                        try:
                            int(address, 16)
                            i = 0
                            while i < len(instructions):
                                if address == instructions[i][0][:-1] and not '.LABEL' in instructions[i-1][0]:
                                    label = ['.LABEL{}:'.format(random.randint(10000000, 100000000))]
                                    instructions.insert(i, label)
                                    ins[2] = label

                                elif address == instructions[i][0][:-1] and '.LABEL' in instructions[i-1][0]:
                                    ins[2] = instructions[i-1][0][:-1]
                                    for j in range(3, len(ins)):
                                        ins[j] = ''
                                i += 1
                        except:
                            pass
                for i in range(len(instructions)):
                    if '.LABEL' not in instructions[i][0]:
                        instructions[i] = '\t' + '\t'.join(instructions[i][1:]) + '\n'
                    else:
                        instructions[i] = instructions[i][0] + '\n'
                funcs.append(''.join(instructions))
            instructions = []
                        
        elif start and asm:
            instructions.append(asm.split())
    return funcs

In [7]:
##############################################
# prepend_function_name(list)
# parameter: a list of instructions in functions
# return: a string of functions with function name, the number of functions
##############################################
def prepend_function_name(funcs):
    str_funcs = ''
    funcs = list(set(funcs))
    for i in range(len(funcs)):
        str_funcs += 'func{}:\n'.format(i) + funcs[i]
    return str_funcs, len(funcs)

In [8]:
##############################################
# parse_asm2vec(str, int)
# parameter: a file containing a number of parsed objdump functions of assembly
# return: a list of all asm2vec parsed function
##############################################
def parse_asm2vec(f, function_length):
    func_names = ['func{}'.format(i) for i in range(function_length)]
    data = asm2vec.parse.parse(f, func_names=func_names)
    return data

In [9]:
asm = objdump('training/echo')
funcs = parse_objdump(asm, '<main>')
str_funcs, l = prepend_function_name(funcs)
open('str_funcs', 'w').write(str_funcs)
#data = parse_asm2vec('str_funcs', l)
#model = asm2vec.model.Asm2Vec(d=200)
#training_repo = model.make_function_repo(data)
#model.train(training_repo)

8811

In [9]:
##############################################
# one_file(str, str)
# parameter: a binary, target function
# return: asm2vec parsed data
##############################################
def one_file(f, target=''):
    if not check_filetype(f): return
    asm = objdump(f)
    funcs = parse_objdump(asm, target)
    str_funcs, funcs_length = prepend_function_name(funcs)
    open('str_funcs', 'w').write(str_funcs)
    data = parse_asm2vec('str_funcs', funcs_length)
    return data

In [10]:
##############################################
# multiple_file(str)
# parameter: directory name, target function
# return: asm2vec parsed data
##############################################
def multiple_file(d, target=''):
    files = os.listdir(d)
    funcs = []
    for file in files:
        path = d + file
        if not check_filetype(path): continue
        asm = objdump(path)
        funcs += parse_objdump(asm, target)
    str_funcs, funcs_length = prepend_function_name(funcs)
    open('str_funcs', 'w').write(str_funcs)
    data = parse_asm2vec('str_funcs', funcs_length)
    return data

In [None]:
##############################################
# Training
##############################################
train_data = multiple_file(TRAINDATA_DIR)
model = asm2vec.model.Asm2Vec(d=200)
training_repo = model.make_function_repo(train_data)
model.train(training_repo)

In [None]:
##############################################
# Testing
##############################################
testdir = os.listdir(TESTDATA_TEST_DIR)
testfile2target = {}
for f in testdir:
    path = TESTDATA_TEST_DIR + f
    if not check_filetype(path):
        continue
    data = one_file(path, TARGET_FUNCTION)
    if data:
        testfile2target[f] = model.to_vec(data[0])

traindir = os.listdir(TESTDATA_CONTROL_DIR)
trainfile2target = {}
for f in traindir:
    path = TESTDATA_CONTROL_DIR + f
    if not check_filetype(path):
        continue
    data = one_file(path, TARGET_FUNCTION)
    if data:
        assert len(data) == 1
        trainfile2target[f] = model.to_vec(data[0])

In [None]:
RANK_TOP = len(testfile2target)
rank = {}

for first in testfile2target:
    rank[first] = {}
    for second in trainfile2target:
        sim = cosine_similarity(testfile2target[first], trainfile2target[second])
        rank[first][second] = sim

In [None]:
for r in rank:
    after_sort = sorted(rank[r].items(), key=lambda x:x[1], reverse=True)
    for a in range(len(after_sort)):
        if r == after_sort[a][0]:
            print(r, a, after_sort[a][1])
            break

In [35]:
check = '' 
cosine_similarity(trainfile2target['head'], testfile2target['ls'])

0.9975475345141777

In [33]:
import pickle
pickle.dump(rank, open('log/{}.pkl'.format(''), 'wb'))

In [36]:
import pickle
pickle.dump(model, open('log/model_{}.pkl'.format('202012170335'), 'wb'))