In [1]:
%%time
import pandas as pd
import seaborn as sns
import os
import hashlib
import math
import csv
import itertools
import pandas_bokeh
from collections import Counter
import matplotlib.pyplot as plt

pandas_bokeh.output_notebook()
import jupyter_black

jupyter_black.load()

CPU times: user 5.71 s, sys: 1.89 s, total: 7.6 s
Wall time: 3.29 s


<h1>Ghidra 11.3 and PyGhidra</h1>

In [2]:
!wget https://github.com/therealsaumil/emux/raw/master/files/emux/TRI227WF/rootfs.tar.bz2
!wget https://github.com/therealsaumil/emux/raw/master/files/emux/AC15/squashfs-root.tar.bz2

--2025-07-07 11:26:40--  https://github.com/therealsaumil/emux/raw/master/files/emux/TRI227WF/rootfs.tar.bz2
Loaded CA certificate '/etc/ssl/certs/ca-certificates.crt'
Resolving github.com (github.com)... 20.26.156.215
Connecting to github.com (github.com)|20.26.156.215|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/therealsaumil/emux/master/files/emux/TRI227WF/rootfs.tar.bz2 [following]
--2025-07-07 11:26:40--  https://raw.githubusercontent.com/therealsaumil/emux/master/files/emux/TRI227WF/rootfs.tar.bz2
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6018759 (5.7M) [application/octet-stream]
Saving to: ‘rootfs.tar.bz2’


2025-07-07 11:26:40 (23.7 MB/s) - ‘rootfs.tar.bz2’ saved [6018759/60187

In [3]:
!bzip2 -d ./rootfs.tar.bz2
!bzip2 -d ./squashfs-root.tar.bz2
!tar -xvf ./rootfs.tar
!tar -xvf ./squashfs-root.tar

rootfs/
rootfs/lib/
rootfs/lib/libnsl.so.0
rootfs/lib/libthread_db-0.9.30.2.so
rootfs/lib/libupnp.so.2.0.3
rootfs/lib/libutil.so.0
rootfs/lib/ld-linux.so.3
rootfs/lib/libupnp.so
rootfs/lib/libixml.so
rootfs/lib/libpthread.so.0
rootfs/lib/librt-0.9.30.2.so
rootfs/lib/libthreadutil.so.2
rootfs/lib/ld-uClibc.so.0
rootfs/lib/libgcc_s.so
rootfs/lib/libm.so.0
rootfs/lib/libgcc_s.so.1
rootfs/lib/libcrypt.so.0
rootfs/lib/libpthread-0.9.30.2.so
rootfs/lib/libresolv.so.0
rootfs/lib/libcrypt-0.9.30.2.so
rootfs/lib/libdl-0.9.30.2.so
rootfs/lib/libiw.so.29
rootfs/lib/libixml.so.2
rootfs/lib/libdl.so.0
rootfs/lib/libupnp.so.2
rootfs/lib/libutil-0.9.30.2.so
rootfs/lib/ld-uClibc-0.9.30.2.so
rootfs/lib/librt.so.0
rootfs/lib/libc.so.0
rootfs/lib/libresolv-0.9.30.2.so
rootfs/lib/libthreadutil.so
rootfs/lib/libthread_db.so.1
rootfs/lib/libnsl-0.9.30.2.so
rootfs/lib/libm-0.9.30.2.so
rootfs/lib/libixml.so.2.0.3
rootfs/lib/libthreadutil.so.2.1.0
rootfs/lib/libuClibc-0.9.30.2.so
rootfs/lib/modules/
rootfs/lib

In [4]:
!mkdir ./AC15
!cp ./squashfs-root/bin/httpd ./AC15/AC15_httpd
!cp ./rootfs/usr/bin/webs ./AC15/TRI227WF_webs

In [2]:
import pyghidra
import os
import csv
from operator import itemgetter

pyghidra.start()

import ghidra
from ghidra.app.util.headless import HeadlessAnalyzer
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.base.project import GhidraProject
from java.lang import String
from ghidra.program.util import DefinedDataIterator, CyclomaticComplexity
from ghidra.program.model.listing import Function
from ghidra.program.model.symbol import SourceType
from ghidra.util.exception import CancelledException

# Define dangerous functions
dangerous_functions = ["system", "execve", "execle", "execvp", "execlp", "doSystemCmd"]


def format_high_complexity_funcs(funcs):
    """Format the top 10 high complexity functions as a string."""
    return "; ".join([f"{name}({cc})" for name, cc in funcs])


def analyze_binary(binary_path):
    try:
        with pyghidra.open_program(binary_path) as flat_api:
            # Get program and listing
            current_program = flat_api.getCurrentProgram()
            listing = current_program.getListing()

            # Get basic program info
            files = current_program.getName()
            arches = current_program.getLanguage().toString()
            sha256 = current_program.getExecutableSHA256()
            md5 = current_program.getExecutableMD5()
            total_insn = listing.getNumInstructions()

            # Get functions and calculate metrics
            all_funcs = list(listing.getFunctions(True))
            total_cc = 0
            system_xrefs_details = []
            monitor = flat_api.getMonitor()

            # Analyze dangerous functions and their xrefs
            ref_manager = current_program.getReferenceManager()
            for func in all_funcs:
                if func.getName() in dangerous_functions:
                    entry_point = func.getEntryPoint()
                    references = ref_manager.getReferencesTo(entry_point)
                    for xref in references:
                        ref_func = listing.getFunctionContaining(xref.getFromAddress())
                        if ref_func:
                            detail = f"{xref.getFromAddress()} ({ref_func.getName()})"
                            system_xrefs_details.append(detail)

            num_calls_in_system_xrefs = len(system_xrefs_details)

            # Calculate cyclomatic complexity metrics
            cc_calculator = CyclomaticComplexity()
            complexity_funcs = []
            for func in all_funcs:
                try:
                    cc = cc_calculator.calculateCyclomaticComplexity(func, monitor)
                    total_cc += cc
                    # Store all functions with their complexity
                    complexity_funcs.append((func.getName(), cc))
                except CancelledException:
                    print(
                        f"Warning: Complexity calculation cancelled for function {func.getName()}"
                    )

            num_funcs = len(all_funcs)
            average_cc = total_cc / num_funcs if num_funcs > 0 else 0

            # Sort functions by complexity and get top 10
            top_complex_funcs = sorted(
                complexity_funcs, key=itemgetter(1), reverse=True
            )[:10]

            # Save results to CSV
            csv_file_path = "./ghidratest.csv"
            with open(csv_file_path, mode="a", newline="") as csv_file:
                fieldnames = [
                    "File",
                    "Architecture",
                    "SHA256",
                    "MD5",
                    "Total_Instructions",
                    "Total_Functions",
                    "System_Xrefs",
                    "Total_System_Xrefs",
                    "Average_Cyclomatic_Complexity",
                    "Top_10_Complex_Functions",  # New field
                ]
                writer = csv.DictWriter(csv_file, fieldnames=fieldnames)

                if csv_file.tell() == 0:
                    writer.writeheader()

                writer.writerow(
                    {
                        "File": files,
                        "Architecture": arches,
                        "SHA256": sha256,
                        "MD5": md5,
                        "Total_Instructions": total_insn,
                        "Total_Functions": num_funcs,
                        "System_Xrefs": "; ".join(system_xrefs_details),
                        "Total_System_Xrefs": num_calls_in_system_xrefs,
                        "Average_Cyclomatic_Complexity": round(average_cc, 2),
                        "Top_10_Complex_Functions": format_high_complexity_funcs(
                            top_complex_funcs
                        ),
                    }
                )

    except Exception as e:
        print(f"Error loading binary {binary_path}: {str(e)}. Skipping file.")
    except Exception as e:
        print(f"Error analyzing binary: {str(e)}")
        raise


def scan_directory(directory_path):
    # Scan the directory for binaries and analyze each one
    for root, _, files in os.walk(directory_path):
        for file in files:
            binary_path = os.path.join(root, file)
            if os.path.isfile(binary_path):  # Make sure it's a file
                print(f"Analyzing binary: {binary_path}")
                analyze_binary(binary_path)


if __name__ == "__main__":
    # Change this path to the directory you want to scan
    directory_path = "./AC15/"
    scan_directory(directory_path)




Analyzing binary: ./AC15/AC15_httpd




Analyzing binary: ./AC15/TRI227WF_webs


In [3]:
%%time
df = pd.read_csv("./ghidratest.csv", header=None)

CPU times: user 13.7 ms, sys: 1.07 ms, total: 14.7 ms
Wall time: 15.2 ms


<h2>Naming Pandas Columns</h2>

In [4]:
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,File,Architecture,SHA256,MD5,Total_Instructions,Total_Functions,System_Xrefs,Total_System_Xrefs,Average_Cyclomatic_Complexity,Top_10_Complex_Functions
1,AC15_httpd,ARM/little/32/v8,e2880dc6a19a9ac5122d8686047db12a223d062324de7c...,35a3115eb9749638fd5e19c26c7fa752,198729,1643,0003db54 (FUN_0003da1c); 0004fc48 (TendaTelnet...,185,5.95,aspTendaGetStatus(316); FUN_00012980(285); FUN...
2,TRI227WF_webs,ARM/little/32/v8,800b296c5baa8d5d2dc5fba26ac2fc6c0e2f92b13e9f4b...,8c149607136281c97339f27d0280d1b9,62196,897,00009934 (FUN_0000987c); 0003d8f0 (FUN_0003d87...,4,5.49,FUN_0004b820(94); FUN_00049ef0(85); FUN_0000d4...


In [5]:
%%time
df.columns = [
    "File",
    "Architecture",
    "SHA256",
    "MD5",
    "Strings",
    "Functions",
    "System_Xrefs",
    "Total_System_Xrefs",
    "Average_Cyclomatic_Complexity",
    "Top_10_Complex_Functions",
]

CPU times: user 356 μs, sys: 0 ns, total: 356 μs
Wall time: 361 μs


In [6]:
df = pd.read_csv(
    "ghidratest.csv",
    dtype={
        "Total_Instructions": int,
        "Total_Functions": int,
        "Total_System_Xrefs": int,
        "Average_Cyclomatic_Complexity": float,
    },
)

In [7]:
df.fillna("None", inplace=True)

<h2>Verify the Pandas Output</h2>

In [8]:
%%time
df

CPU times: user 4 μs, sys: 0 ns, total: 4 μs
Wall time: 9.06 μs


Unnamed: 0,File,Architecture,SHA256,MD5,Total_Instructions,Total_Functions,System_Xrefs,Total_System_Xrefs,Average_Cyclomatic_Complexity,Top_10_Complex_Functions
0,AC15_httpd,ARM/little/32/v8,e2880dc6a19a9ac5122d8686047db12a223d062324de7c...,35a3115eb9749638fd5e19c26c7fa752,198729,1643,0003db54 (FUN_0003da1c); 0004fc48 (TendaTelnet...,185,5.95,aspTendaGetStatus(316); FUN_00012980(285); FUN...
1,TRI227WF_webs,ARM/little/32/v8,800b296c5baa8d5d2dc5fba26ac2fc6c0e2f92b13e9f4b...,8c149607136281c97339f27d0280d1b9,62196,897,00009934 (FUN_0000987c); 0003d8f0 (FUN_0003d87...,4,5.49,FUN_0004b820(94); FUN_00049ef0(85); FUN_0000d4...


<h2>Checking Pandas Datatypes</h2>

In [9]:
%%time
df.dtypes

CPU times: user 111 μs, sys: 12 μs, total: 123 μs
Wall time: 128 μs


File                              object
Architecture                      object
SHA256                            object
MD5                               object
Total_Instructions                 int64
Total_Functions                    int64
System_Xrefs                      object
Total_System_Xrefs                 int64
Average_Cyclomatic_Complexity    float64
Top_10_Complex_Functions          object
dtype: object

<h2>Changing Datatypes to String</h2>

In [10]:
%%time
df["Total_Instructions"] = df["Total_Instructions"].astype(str)
df["Total_Functions"] = df["Total_Functions"].astype(str)
df["Architecture"] = df["Architecture"].astype(str)
df["File"] = df["File"].astype(str)
df["SHA256"] = df["SHA256"].astype(str)
df["MD5"] = df["MD5"].astype(str)
df["Architecture"] = df["Architecture"].astype(str)
df["System_Xrefs"] = df["System_Xrefs"].astype(str)

CPU times: user 3.86 ms, sys: 0 ns, total: 3.86 ms
Wall time: 3.62 ms


<h2>Searching for Features</h2>

In [11]:
df[df["Architecture"].str.contains("little", na=False)]

Unnamed: 0,File,Architecture,SHA256,MD5,Total_Instructions,Total_Functions,System_Xrefs,Total_System_Xrefs,Average_Cyclomatic_Complexity,Top_10_Complex_Functions
0,AC15_httpd,ARM/little/32/v8,e2880dc6a19a9ac5122d8686047db12a223d062324de7c...,35a3115eb9749638fd5e19c26c7fa752,198729,1643,0003db54 (FUN_0003da1c); 0004fc48 (TendaTelnet...,185,5.95,aspTendaGetStatus(316); FUN_00012980(285); FUN...
1,TRI227WF_webs,ARM/little/32/v8,800b296c5baa8d5d2dc5fba26ac2fc6c0e2f92b13e9f4b...,8c149607136281c97339f27d0280d1b9,62196,897,00009934 (FUN_0000987c); 0003d8f0 (FUN_0003d87...,4,5.49,FUN_0004b820(94); FUN_00049ef0(85); FUN_0000d4...


<h2>Using Query to Search for Features</h2>

In [12]:
df.query("Average_Cyclomatic_Complexity > 3")

Unnamed: 0,File,Architecture,SHA256,MD5,Total_Instructions,Total_Functions,System_Xrefs,Total_System_Xrefs,Average_Cyclomatic_Complexity,Top_10_Complex_Functions
0,AC15_httpd,ARM/little/32/v8,e2880dc6a19a9ac5122d8686047db12a223d062324de7c...,35a3115eb9749638fd5e19c26c7fa752,198729,1643,0003db54 (FUN_0003da1c); 0004fc48 (TendaTelnet...,185,5.95,aspTendaGetStatus(316); FUN_00012980(285); FUN...
1,TRI227WF_webs,ARM/little/32/v8,800b296c5baa8d5d2dc5fba26ac2fc6c0e2f92b13e9f4b...,8c149607136281c97339f27d0280d1b9,62196,897,00009934 (FUN_0000987c); 0003d8f0 (FUN_0003d87...,4,5.49,FUN_0004b820(94); FUN_00049ef0(85); FUN_0000d4...


In [13]:
df_sorted = df.sort_values(by="Total_System_Xrefs", ascending=False)

In [14]:
%%time
df_sorted.plot_bokeh.bar(
    x="File",
    y="Total_System_Xrefs",
    figsize=(900, 700),
    title="Potentially Dangerous Calls To System",
    xlabel="Binary",
    ylabel="Total",
    color="#2196f3",
    vertical_xlabel=True,
)

CPU times: user 29.5 ms, sys: 0 ns, total: 29.5 ms
Wall time: 28.9 ms


<h2>Creating Charts Using Pandas Bokeh</h2>

In [15]:
df.plot_bokeh.bar(
    x="File",
    y=["Average_Cyclomatic_Complexity"],
    figsize=(900, 700),
    title="Average Cyclomatic Complexity",
    xlabel="File",
    ylabel="Average Cyclomatic Complexity",
    color="#2196f3",
    vertical_xlabel=True,
)

In [16]:
# Process the Top_10_Complex_Functions column
def extract_func_data(func_str):
    # Split the string into individual function entries
    funcs = func_str.split("; ")

    # Extract function names and complexity scores
    names = []
    scores = []
    for func in funcs:
        if func:  # Check if the function entry is not empty
            name, score = func.strip("()").split("(")
            names.append(name)
            scores.append(float(score))

    return pd.DataFrame({"Function_Name": names, "Complexity": scores})


# Create a visualization for each binary
for idx, row in df.iterrows():
    binary_name = row["File"]
    func_data = extract_func_data(row["Top_10_Complex_Functions"])

    # Create bar plot
    plot = func_data.plot_bokeh(
        kind="bar",
        x="Function_Name",
        y="Complexity",
        title=f"Top 10 High Complexity Functions in {binary_name}",
        xlabel="Function Name",
        ylabel="Cyclomatic Complexity",
        figsize=(1000, 600),
        color="#2196f3",
        vertical_xlabel=True,
        show_figure=True,
    )

<h2>Reference Material</h2>

- 10 Minutes to Pandas: https://pandas.pydata.org/docs/user_guide/10min.html
- Pandas Cookbook: https://pandas.pydata.org/docs/user_guide/cookbook.html#cookbook
- Ghidra API: https://ghidra.re/ghidra_docs/api/index.html
- PyGhidra: https://github.com/NationalSecurityAgency/ghidra/tree/master/Ghidra/Features/PyGhidra
- EMUX: https://github.com/therealsaumil/emux
- Ghidra Snippets: https://github.com/HackOvert/GhidraSnippets
- Auditing system calls for command injection vulnerabilities using Ghidra's PCode: https://youtu.be/UVNeg7Vqytc
- cetfor/SystemCallAuditorGhidra.py: https://github.com/HackOvert/PotentiallyVulnerable/blob/main/CWE-78/SystemCallAuditorGhidra.py