### construct and prune FCG

In [None]:
!pip install androguard
!pip install loguru

In [None]:
from loguru import logger

In [None]:
logger.disable("androguard")

In [None]:
from androguard.core.bytecode import FormatClassToJava
from androguard.misc import AnalyzeAPK
from androguard.core.analysis.analysis import MethodAnalysis, ExternalMethod
import networkx as nx

In [None]:
def find_ancestor(node: MethodAnalysis, family: list):
    for _, parent, _ in node.get_xref_from():
        if parent not in family:
            family.append(parent)
            find_ancestor(parent, family)

In [None]:
def cg(apk):
    a, d, dx = AnalyzeAPK(apk)
    entry_points = map(
        FormatClassToJava,
        a.get_activities() + a.get_providers() + a.get_services() + a.get_receivers(),
    )
    entry_points = list(entry_points)

    callgraph = dx.get_call_graph(no_isolated=True, entry_points=entry_points)

    important_nodes = []
    for meth, _ in dx.get_permissions(a.get_effective_target_sdk_version()):
        if meth not in important_nodes:
            important_nodes.append(meth)
            find_ancestor(meth, important_nodes)

    for node in important_nodes[:]:
        for _, child, _ in node.get_xref_to():
            if child not in important_nodes:
                important_nodes.append(child)

    important_nodes = [node.get_method() for node in important_nodes]
    callgraph.remove_nodes_from(set(callgraph.nodes) - set(important_nodes))

    return callgraph

### enhance FCG: assign code_vector to nodes

In [None]:
!git clone https://github.com/bdqnghi/infercode.git

In [None]:
%cd infercode
!pip install .
%cd ..

In [None]:
!unzip /root/.tree-sitter/Linux.zip -d /root/.tree-sitter/

In [None]:
import os

# Change from -1 to 0 to enable GPU
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import warnings

warnings.filterwarnings("ignore")
import tensorflow as tf
import logging

logging.getLogger("tensorflow").disabled = True
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

try:
    # Disable all GPUS
    tf.config.set_visible_devices([], "GPU")
    visible_devices = tf.config.get_visible_devices()
    for device in visible_devices:
        assert device.device_type != "GPU"
except:
    # Invalid device or cannot modify virtual devices once initialized.
    pass

from infercode.client.infercode_client import InferCodeClient

infercode = InferCodeClient(language="java")
infercode.init_from_config()

In [None]:
def enhance_fcg(fcg: nx.DiGraph):
    mappings = {}
    for node in fcg.nodes:
        if isinstance(node, ExternalMethod):
            code_vector = infercode.encode([node.get_name()])
        else:
            code_vector = infercode.encode([node.get_source()])
        mappings[node] = code_vector.reshape(-1)
    nx.set_node_attributes(G=fcg, values=mappings, name="code_vector")

    for node in fcg.nodes:
        fcg.nodes[node]["external"] = int(fcg.nodes[node]["external"])
        del fcg.nodes[node]["entrypoint"]
        del fcg.nodes[node]["methodname"]
        del fcg.nodes[node]["descriptor"]
        del fcg.nodes[node]["accessflags"]
        del fcg.nodes[node]["classname"]

    return fcg

### Networkx to PyTorch Geometric

In [None]:
!pip install torch-geometric

In [None]:
import torch
import torch_geometric

In [None]:
def nx_to_pyg(fcg: nx.DiGraph, label: int):
    fcg = nx.convert_node_labels_to_integers(fcg)

    dg = torch_geometric.utils.from_networkx(
        G=fcg, group_node_attrs=["code_vector", "external"]
    )

    # y = 0 if label == "Benign" else 1
    dg.y = torch.tensor([label])

    return dg

### create dataset of graph for GIN model

In [None]:
import os
import pathlib
from pathlib import Path

In [None]:
ben_dir = "/content/benign"
mal_dir = "/content/malware5"

In [None]:
print(ben_dir)

In [None]:
def create_dataset(data_dir: str, label: str):
    y = 0 if label == "Benign" else 1
    dataset = []
    # cnt = 0
    for apk in Path(data_dir).iterdir():
        # if cnt == 20:
        #   break
        try:
            fcg = cg(str(apk))
            efcg = enhance_fcg(fcg)
            dg = nx_to_pyg(efcg, y)
            dataset.append(dg)
            # cnt += 1
        except:
            pass

    return dataset

In [None]:
ben_dataset = create_dataset(ben_dir, "Benign")
mal_dataset = create_dataset(mal_dir, "Malware")

In [None]:
len(ben_dataset)

In [None]:
import pickle

In [None]:
dataset = [sample for sub in zip(ben_dataset, mal_dataset) for sample in sub]

In [None]:
len(dataset)

### Load Dataset

In [None]:
train_dataset = dataset[:int(len(dataset)*0.8)]
val_dataset   = dataset[int(len(dataset)*0.8):int(len(dataset)*0.9)]
test_dataset  = dataset[int(len(dataset)*0.9):]

In [None]:
len(train_dataset), len(test_dataset), len(val_dataset)

In [None]:
from torch_geometric.loader import DataLoader

In [None]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

### GIN model

In [None]:
from torch.nn import Linear, Sequential, BatchNorm1d, ReLU, Dropout
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GINConv
from torch_geometric.nn import global_mean_pool, global_add_pool

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
num_node_features = dataset[0].x.shape[1]
num_classes = 2

In [None]:
num_node_features

In [None]:
class GIN(torch.nn.Module):
    """GIN"""
    def __init__(self, dim_h):
        super(GIN, self).__init__()
        self.conv1 = GINConv(
            Sequential(Linear(num_node_features, dim_h),
                       BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()))
        self.conv2 = GINConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()))
        self.conv3 = GINConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()))
        self.conv4 = GINConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()))
        self.conv5 = GINConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()))
        self.lin1 = Linear(dim_h*5, dim_h*3)
        self.lin2 = Linear(dim_h*3, num_classes)

    def forward(self, x, edge_index, batch):
        # Node embeddings
        h1 = self.conv1(x, edge_index)
        h2 = self.conv2(h1, edge_index)
        h3 = self.conv3(h2, edge_index)
        h4 = self.conv3(h3, edge_index)
        h5 = self.conv3(h4, edge_index)

        # Graph-level readout
        h1 = global_add_pool(h1, batch)
        h2 = global_add_pool(h2, batch)
        h3 = global_add_pool(h3, batch)
        h4 = global_add_pool(h4, batch)
        h5 = global_add_pool(h5, batch)

        # Concatenate graph embeddings
        hG = torch.cat((h1, h2, h3, h4, h5), dim=1)

        # Classifier
        h = self.lin1(hG)
        h = h.relu()
        h = F.dropout(h, p=0.5, training=self.training)
        h = self.lin2(h)

        return hG, F.log_softmax(h, dim=1)

In [None]:
gin = GIN(dim_h=32)

In [None]:
gin.to(device)

In [None]:
def train(model, loader):
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),
                                      lr=0.01,
                                      weight_decay=0.01)
    epochs = 50

    model.train()
    for epoch in range(epochs+1):
        total_loss = 0
        acc = 0
        val_loss = 0
        val_acc = 0

        # Train on batches
        for data in loader:
          optimizer.zero_grad()
          data = data.to(device)
          _, out = model(data.x, data.edge_index, data.batch)
          loss = criterion(out, data.y)
          total_loss += loss / len(loader)
          acc += accuracy(out.argmax(dim=1), data.y) / len(loader)
          loss.backward()
          optimizer.step()

          # Validation
          val_loss, val_acc = test(model, val_loader)

        # Print metrics every 10 epochs
        if(epoch % 10 == 0):
            print(f'Epoch {epoch:>3} | Train Loss: {total_loss:.2f} '
                  f'| Train Acc: {acc*100:>5.2f}% '
                  f'| Val Loss: {val_loss:.2f} '
                  f'| Val Acc: {val_acc*100:.2f}%')

    test_loss, test_acc = test(model, test_loader)
    print(f'Test Loss: {test_loss:.2f} | Test Acc: {test_acc*100:.2f}%')

    return model

@torch.no_grad()
def test(model, loader):
    criterion = torch.nn.CrossEntropyLoss()
    model.eval()
    loss = 0
    acc = 0

    for data in loader:
        data = data.to(device)
        _, out = model(data.x, data.edge_index, data.batch)
        loss += criterion(out, data.y) / len(loader)
        acc += accuracy(out.argmax(dim=1), data.y) / len(loader)

    return loss, acc

def accuracy(pred_y, y):
    """Calculate accuracy."""
    return ((pred_y == y).sum() / len(y)).item()



In [None]:
gin = train(gin, train_loader)

### Graph embedding vector to CSV for RF

In [None]:
import pandas as pd
import numpy as np

In [None]:
def create_df(dataset):
    tmp = []
    for data in dataset:
        gvector, out = gin(data.x, data.edge_index, data.batch)
        if out.argmax(dim=1) == data.y:
            tmp.append(np.append(gvector.detach().numpy().reshape(-1), data.y.numpy()))

    return pd.DataFrame(np.array(tmp))

In [None]:
df = create_df(dataset)

In [None]:
df = df.loc[:, (df.nunique() > 1)]

In [None]:
df.to_csv("datatest02.csv", index=False)

In [None]:
df

### Random Forest

In [None]:
import pandas as pd
import numpy as np

# Modelling
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    precision_score,
    recall_score,
    ConfusionMatrixDisplay,
)
from sklearn.model_selection import RandomizedSearchCV, train_test_split
from scipy.stats import randint

In [None]:
df = pd.read_csv("datatest02.csv")

In [None]:
df

In [None]:
X = df.drop("96", axis=1)
y = df["96"]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2)

In [None]:
rf = RandomForestClassifier()
rf.fit(X_train, y_train)

In [None]:
y_pred = rf.predict(X_test)

In [None]:
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)