In [1]:
import re
import os
from os import listdir
from os.path import isfile, join
from pathlib import Path
import pandas as pd
from typing import Dict, List, get_type_hints
from styleframe import StyleFrame, Styler
from IPython.display import display

### Load baseline (集合所有 Dofloo 的 object) & Report

In [2]:
base_data = pd.read_csv('all_dst_node.csv')
base_data = base_data[base_data['type'].isin(['File','Net'])] # 選 file, net
base_data = base_data[~base_data['text'].isin(['malware','2'])] # 排除 malware, 2
base_data['type'] = base_data['type'].apply(lambda x: x.lower())
# ip_list = 
base_data

Unnamed: 0,text,type
0,/bin/chmod,file
1,/bin/mv,file
2,/bin/rm,file
3,/bin/sed,file
4,/dev/urandom,file
...,...,...
127,10210,net
128,48080,net
129,50050,net
130,NIC,net


In [3]:
report_fir_path = '../../C parse report/sentence csvs/'
family, samplename, outputfolder = 'Dofloo', '???', '.' 

def get_all_filenames(dir: str='./') -> list:
    ''' traverse root directory, and list directories as dirs and files as files. Return filenames in rootdir. '''
    files = [f for f in listdir(dir) if isfile(join(dir, f))]
    files.sort()
    return files

dofloo_report_names = [f for f in get_all_filenames(report_fir_path) if f.startswith(family)] # 只選擇這個家族的報告
dofloo_report_dfs = [pd.read_csv(f"{report_fir_path}{f}") for f in dofloo_report_names]
dofloo_report_names

['Dofloo-BleepingComputer.csv',
 'Dofloo-MalwareMustDie.csv',
 'Dofloo-Securityaffairs.csv',
 'Dofloo-SyscallParty.csv',
 'Dofloo-Trendmicro.csv']

### 單篇文章的 Class 和 整個 Family 的 Class

In [15]:
class ReportEvalModel:
    def __init__(self, baseline: Dict[str, list], sentences: pd.DataFrame, reportname:str=''):
        self.baseline = baseline
        self.sentences = sentences # a df
        self.reportname = reportname
        cols = baseline['text']
        self.match_tbl = pd.DataFrame([[0]*len(cols)]*len(sentences), columns=cols)
        self.match_baseline = set()
        self.match()

    def match(self) -> None:
        '''find baseline in sentences of report. Fill matching result in self.match_tbl.'''
        sentence_list = self.sentences['Content']
        for idx_sent, sent in enumerate(sentence_list):
            # print(idx_sent, sent)
            # for idx_base, base in enumerate(self.baseline.text):
            for idx_base, (base, type) in enumerate(zip(self.baseline['text'], self.baseline['type'])):
                # find baseline in a sentence
                if self.find_baseline(str(sent), base, type) != -1:
                    self.match_tbl.loc[idx_sent,base] = 1 # mark as found
                    self.match_baseline.add(base)
        pass

    def find_baseline(self, sentence: str, baseline: str, baseline_type: str='file') -> int:
        '''not found will return -1'''
        if baseline_type == 'net':
            try:
                port = int(baseline)
                after_colon = True if sentence.find(f':{port}') >= 0 else False # port 前方需要有冒號或空格
                after_space = True if sentence.find(f' {port}') >= 0 else False
                if after_colon or after_space:
                    return 0
                return -1
            except:
                return sentence.find(baseline) # NIC, ip 先不做特別處理
        elif baseline in ['sed','sh']:
            # ^sh, 'sh', <sh>, www sh www (沒測過)
            if re.search(r"[^a-z]sh[^a-z]", sentence) or re.search(r"^sh[^a-z]", sentence):
                return 0
            if re.search(r"[^a-z]sed[^a-z]", sentence) or re.search(r"^sed[^a-z]", sentence):
                return 0
            return -1
        else:
            # 一般檔案就直接比對
            return sentence.find(baseline)
    
    def get_match_sentences(self) -> pd.DataFrame:
        result = self.match_tbl.copy()
        result['match'] = result.sum(axis=1)
        return result
    
    def get_match_baselines(self) -> set:
        return self.match_baseline

class FamilySet:
    '''一個 malware family report 集合'''
    def __init__(self, familyname:str, baseline:Dict[str, list]):
        self.familyname = familyname
        self.baseline = baseline
        self.rem_lst: List[ReportEvalModel] = [] # list of ReportEvalModel under this family
        self.result_tbl = None

    def add_rem(self, sentences: pd.DataFrame, reportname: str=''):
        '''新增 report 到 FamilySet 中，需傳入文本和報告名稱，會沿用 FamilySet 的 baseline.'''
        rem = ReportEvalModel(self.baseline, sentences, reportname)
        self.rem_lst.append(rem)

    def calc_report_coverage_score(self, match_baselines: list[str], apply_weight=True) -> float:
        '''計算這篇報告的 coverage_score，未處理分母為0之情形'''
        if not apply_weight:
            return len(match_baselines) / len(self.baseline['text']) # 無權種的算法
        denominator = sum(self.baseline['weight']) # 分母
        numerator = 0 # 分子
        for b, w in zip(self.baseline['text'], self.baseline['weight']):
            if b in match_baselines:
                numerator += w
        return numerator/denominator

    def show_result(self, apply_weight: bool=True):
        '''print and return result table. baselinse(x) * report(y).'''
        column_names = ['report_name','ttl_match','coverage_score']
        column_names.extend(self.baseline['text'])
        result_tbl = pd.DataFrame([[0]*len(column_names)]*len(self.rem_lst), columns=column_names)
        for i,rem in enumerate(self.rem_lst):
            match_baselines = rem.get_match_baselines() # 這篇報告含有哪些 baseine:set
            c_score = self.calc_report_coverage_score(match_baselines, apply_weight=apply_weight)
            result_tbl.loc[i,'report_name'] = rem.reportname
            result_tbl.loc[i,'ttl_match'] = len(match_baselines)
            result_tbl.loc[i,'coverage_score'] = f'{c_score:.4}'
            for b in match_baselines:
                result_tbl.loc[i,b] = 1
            print(rem.reportname, match_baselines)
        display(result_tbl)
        return result_tbl

### Run Script: 存檔每篇報告的 matched sentence

In [16]:
fset = FamilySet(family, base_data)
for i,rdf in enumerate(dofloo_report_dfs):
    # rem = ReportEvalModel(base_data, rdf, dofloo_report_names[i])
    fset.add_rem(rdf, dofloo_report_names[i])
result_tbl = fset.show_result(apply_weight=False)
# result_tbl.to_csv(f'{outputfolder}/{family}_{samplename[:3]}_FamilySet.csv', index=False) 

Dofloo-BleepingComputer.csv set()
Dofloo-MalwareMustDie.csv {'/etc', '/etc/.', 'nul'}
Dofloo-Securityaffairs.csv {'/etc', '/etc/rc.d/rc.local', '48080', '/etc/rc.local'}
Dofloo-SyscallParty.csv {'/etc', 'sysinfo', 'uname', '/etc/rc.local', '/etc/init.d/boot.local', '/proc/net/dev', '/proc/self/exe', '/proc/cpuinfo'}
Dofloo-Trendmicro.csv {'/etc', '/etc/resolv.conf', '/etc/rc.local', '/etc/init.d/boot.local', '/etc/rc.d/rc.local', '/etc/nsswitch.conf', '/etc/host.conf', '/proc/net/dev', '/proc/self/exe', '/proc/stat', '/proc/cpuinfo'}


Unnamed: 0,report_name,ttl_match,coverage_score,/bin/chmod,/bin/mv,/bin/rm,/bin/sed,/dev/urandom,/etc,/etc/.,...,42.194.9.81:48080,49.235.91.21:50050,8.8.8.8:53,1020,53,10210,48080,50050,NIC,NO_SOCKET
0,Dofloo-BleepingComputer.csv,0,0.0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,Dofloo-MalwareMustDie.csv,3,0.02326,0,0,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
2,Dofloo-Securityaffairs.csv,4,0.03101,0,0,0,0,0,1,0,...,0,0,0,0,0,0,1,0,0,0
3,Dofloo-SyscallParty.csv,8,0.06202,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
4,Dofloo-Trendmicro.csv,11,0.08527,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0


In [17]:
def save_match_sent_to_excel(fset: FamilySet):
    columns = ['report','baseline','sentence number','sentence']
    output_xlsx = f'{outputfolder}/{family}_report_match_sent.csv'
    has_match = sum([len(rem.get_match_baselines()) for rem in fset.rem_lst])
    if has_match == 0:
        print('no matches in CTI reports') # with open 一定要寫入，若全空則直接不開檔
        return
    # with pd.ExcelWriter(output_xlsx, engine='openpyxl') as writer:
    
    # 遍歷報告，一個 rem: ReportEvalModel 代表一篇報告
    csv_data = []
    for rem in fset.rem_lst:
        reportname = rem.reportname.split('.')[0]
        if len(rem.get_match_baselines()) == 0: # 如果報告中無 baseline 跳過不紀錄
            continue
        # sheet_data = [] # shape = columns_len * match_sent_len
        sent_df = rem.get_match_sentences()
        sent_content_lst = rem.sentences['Content'] # 報告中的每個句子

        # 遍歷每個含有 baseline 的句子
        for sid, row in sent_df.iterrows():
            if row['match'] == 0:
                continue
            row.drop('match', inplace=True)
            row = row[row > 0]
            # print(row)
            # 考量每個句子可能含有多個 baseline，故需寫成多行
            for i, v in row.items():
                insert_data = dict().fromkeys(columns)
                insert_data['report'] = reportname
                insert_data['baseline'] = i
                insert_data['sentence number'] = sid + 1
                insert_data['sentence'] = str(sent_content_lst[sid]).strip()
                # sheet_data.append(insert_data)
                csv_data.append(insert_data)
    df = pd.DataFrame(csv_data)
    # print(df)
    df.to_csv(output_xlsx, index=False)

            # 輸出 csv
            
            # 輸出 excel sheet
            # print(rem.reportname,'has', len(sheet_data),'matches')
            # output_sheet = pd.DataFrame(sheet_data)
            # # output_sheet.style.set_properties(subset=['sentence'])
            # styler = Styler(horizontal_alignment='left', vertical_alignment='top')
            # sf = StyleFrame(output_sheet, styler)
            # sf = sf.set_column_width(columns=['baseline'], width=20.0)
            # sf = sf.set_column_width(columns=['sentence number'], width=15.0)
            # sf = sf.set_column_width(columns=['sentence'], width=80.0)
            # sf.to_excel(writer, sheet_name=rem.reportname, index=False) #.save()
            # # output_sheet.to_excel(writer, sheet_name=rem.reportname, index=False)
    return

# 此行會執行並複寫 excel，執行完後要手動調整行距格式，小心使用
save_match_sent_to_excel(fset)