In [None]:
import json
import glob
import os
import pandas as pd
from pathlib import Path
import pickle
import shutil
import subprocess
from tempfile import TemporaryDirectory
import time
from tqdm import tqdm

In [None]:
# Path to the oss detect backdoor binary
oss_detect_backdoor_path = os.path.abspath("../scanners/OSSGadget/src/oss-detect-backdoor/bin/Debug/net6.0/oss-detect-backdoor")

In [None]:
# Scanning packages, save the results to disk, and record running times
def scanning_packages(package_releases_path, results_dir):
    running_times = {}
    package_dirs = [f.path for f in os.scandir(package_releases_path) if f.is_dir()]
    for package_dir in tqdm(package_dirs):
        package_name = f"{package_dir.split('/')[6]}"
        print(package_name)
        # As there a big packages containing non python files, we select only python files to analyze
        with TemporaryDirectory() as temp_dir:
            for dirpath,_,filenames in os.walk(package_dir):
                for f in filenames:
                    file_path = os.path.abspath(os.path.join(dirpath, f))
                    if Path(file_path).suffix == '.py':
                        shutil.copy2(file_path, temp_dir)
            start_time = time.time()
            subprocess.run([oss_detect_backdoor_path, temp_dir, "-f", "sarifv2", "-o", f"{os.path.join(results_dir, package_name)}.sarif"])
            running_times[package_name] = time.time() - start_time
    return running_times

In [None]:
def collect_alerts(results_dir):
    alerts = []
    for root, dirs, files in os.walk(results_dir):
        for file in files:
            file_path = os.path.join(root, file)
            if file_path.endswith(".sarif"):
                package_name = file_path.split("/")[-1].replace(".sarif", "")
                print(package_name)
                alert_data = json.load(open(file_path))
                alert_data["package_name"] = package_name
                alerts.append(alert_data)
    return alerts

In [None]:
def process_alerts(alerts):
    processed_alerts = []
    for data in alerts:
        results = data["runs"][0]["results"]
        package_name = data["package_name"]
        if len(results) > 0:
            for result in results:
                alert = result["message"]["text"]
                for loc in result["locations"]:
                    target = loc['physicalLocation']["address"]["fullyQualifiedName"]
                    if target.endswith(".py"):
                        processed_alerts.append([package_name, target, alert])
        else:
            print(f"{package_name} does not have alerts")
    return processed_alerts

In [None]:
def get_tp_fp(num_alerts_list, threshold):
    count = 0
    for i in num_alerts_list:
        if i > threshold:
            count = count + 1
    tp = count
    fp = len(num_alerts_list) - tp
    return (tp, fp)

## Malicious packages

In [None]:
# Paths to malicious packages and the scanning results
malicious_packages_path = os.path.abspath("../dataset/malicious-packages/")
malicious_results_dir = os.path.abspath("../results/oss-detect-backdoor/malicious-packages/")

In [None]:
# Scanning packages, and record runtimes
malicious_packages_running_times = scanning_packages(malicious_packages_path, malicious_results_dir)

In [None]:
# Runtime statistics
malicious_packages_running_times_df = pd.DataFrame(list(malicious_packages_running_times.items()), columns=['package', 'running_time'])
malicious_packages_running_times_df["running_time"].describe()

In [None]:
# Saving runtime results
malicious_packages_running_times_df.to_csv("../results/running_times/oss-detect-backdoor/malicious-packages.csv")

In [None]:
# Analyzing maicious packaes results
malicious_results_path = os.path.abspath("../results/oss-detect-backdoor/malicious-packages/")
malicious_results = collect_alerts(malicious_results_path)

In [None]:
# Processing malicious alerts
malicous_packages_scanning_results = process_alerts(malicious_results)

### Triggered rules in all Python files in malicious packages

In [None]:
# Loading the existing result in case we do not want to rescan the packages
with open(os.path.abspath(os.path.join("..", "results", "oss-detect-backdoor", "malicious-packages.pkl")), 'rb') as fp:
    malicous_packages_scanning_results = pickle.load(fp)

In [None]:
malicious_results_df = pd.DataFrame(malicous_packages_scanning_results, columns=["package", "target", "rule"])

In [None]:
# Number of rules per package
malicious_packages_rules_groupby = malicious_results_df.groupby('package')['rule']
print(f"Total number of rules: {malicious_packages_rules_groupby.count().sum()}")
malicious_packages_rules_groupby.count().describe()

### Triggered rules in all setup.py files in malicious packages

In [None]:
malicious_packages_rules_setup_df = malicious_results_df[malicious_results_df['target'].str.contains('setup.py')]

In [None]:
# Number of rules per package
malicious_packages_rules_setup_groupby = malicious_packages_rules_setup_df.groupby('package')['rule']
print(f"Total number of rules: {malicious_packages_rules_setup_groupby.count().sum()}")
malicious_packages_rules_setup_groupby.count().describe()

In [None]:
# Saving the existing result
with open(os.path.abspath("../results/malicious_packages_scanning_results_oss_detect_backdoor.pkl"), 'wb') as fp:
    pickle.dump(malicious_results, fp, protocol=pickle.HIGHEST_PROTOCOL)

## Popular packages

In [None]:
# Paths to popular packages and results dir
popular_packages_path = os.path.abspath("../dataset/popular-packages/")
popular_results_dir = os.path.abspath("../results/oss-detect-backdoor/popular-packages/")

In [None]:
# Scanning packages, and record runtimes
popular_packages_running_times = scanning_packages(popular_packages_path, popular_results_dir)

In [None]:
# Measuring runtimes of scanning popular packages
popular_packages_running_times_df = pd.DataFrame(list(popular_packages_running_times.items()), columns=['package', 'running_time'])
popular_packages_running_times_df["running_time"].describe()

In [None]:
popular_packages_running_times_df.to_csv("../results/running_times/oss-detect-backdoor/popular-packages.csv")

In [None]:
# Analyzing the results
popular_results_path = os.path.abspath("../results/oss-detect-backdoor/popular-packages/")
popular_results = collect_alerts(popular_results_path)

In [None]:
# Processing malicious alerts
popular_packages_scanning_results = process_alerts(popular_results)

### Triggered rules in all Python files in popular packages

In [None]:
# Loading the existing result in case we do not want to rescan the packages
with open(os.path.abspath(os.path.join("..", "results", "oss-detect-backdoor", "popular-packages.pkl")), 'rb') as fp:
    popular_packages_scanning_results = pickle.load(fp)

In [None]:
# Transform the results to DataFrame for analysis
popular_results_df = pd.DataFrame(popular_packages_scanning_results, columns=["package", "target", "rule"])

In [None]:
# Number of rules per package
popular_packages_rules_groupby = popular_results_df.groupby('package')['rule']
print(f"Total number of rules: {popular_packages_rules_groupby.count().sum()}")
popular_packages_rules_groupby.count().describe()

In [None]:
# Ratio of true positives to false postives 
thresholds = [1, 2, 3, 4, 5]
scores = popular_packages_rules_groupby.count().to_list()
thesholds_tpr_fpr_ratio = []
for t in thresholds:
    tpr, fpr = get_tpr_fpr(scores, t)
    print(t, round(tpr/fpr, 2))

### Triggered rules in all setup.py files in popular packages

In [None]:
# Select only setup.py files of the packages
popular_packages_rules_setup_df = popular_results_df[popular_results_df['target'].str.contains('setup.py')]

In [None]:
# Number of rules per package
popular_packages_rules_setup_groupby = popular_packages_rules_setup_df.groupby('package')['rule']
print(f"Total number of rules: {popular_packages_rules_setup_groupby.count().sum()}")
popular_packages_rules_setup_groupby.count().describe()

In [None]:
# Ratio of true positives to false postives 
thresholds = [1, 2, 3, 4, 5]
scores = popular_packages_rules_setup_groupby.count().to_list()
thesholds_tpr_fpr_ratio = []
for t in thresholds:
    tp, fp = get_tp_fp(scores, t)
    print(t, tp, fp)

In [None]:
# Saving the existing result
with open(os.path.abspath("../results/oss-detect-backdoor/popular-packages.pkl"), 'wb') as fp:
    pickle.dump(popular_results, fp, protocol=pickle.HIGHEST_PROTOCOL)

## Random packages

In [None]:
# Loading the existing result in case we do not want to rescan the packages
with open(os.path.abspath(os.path.join("..", "results", "oss-detect-backdoor", "random-packages.pkl")), 'rb') as fp:
    popular_packages_scanning_results = pickle.load(fp)

In [None]:
# Paths to random packages and results dir
random_packages_path = os.path.abspath("../dataset/random-packages/")
random_results_dir = os.path.abspath("../results/oss-detect-backdoor/random-packages//")

In [None]:
# Scanning packages, and record runtimes
random_packages_running_times = scanning_packages(random_packages_path, random_results_dir)

In [None]:
random_packages_running_times_df = pd.DataFrame(list(random_packages_running_times.items()), columns=['package', 'running_time'])
random_packages_running_times_df["running_time"].describe()

In [None]:
random_packages_running_times_df.to_csv("../results/running_times/oss-detect-backdoor/random-packages.csv")

In [None]:
random_results_path = os.path.abspath("../results/oss-detect-backdoor/random-packages/")
random_results = collect_alerts(random_results_path)

In [None]:
# Processing malicious alerts
random_packages_scanning_results = process_alerts(random_results)

### Triggered rules in all Python files in random packages

In [None]:
# Transform the results to DataFrame for analyssi
random_results_df = pd.DataFrame(random_packages_scanning_results, columns=["package", "target", "rule"])

In [None]:
# Number of rules per package
random_packages_rules_groupby = random_results_df.groupby('package')['rule']
print(f"Total number of rules: {random_packages_rules_groupby.count().sum()}")
random_packages_rules_groupby.count().describe()

In [None]:
# Ratio of true positives to false postives 
thresholds = [1, 5, 10, 15, 20, 25, 30]
scores = random_packages_rules_groupby.count().to_list()
thesholds_tpr_fpr_ratio = []
for t in thresholds:
    tpr, fpr = get_tpr_fpr(scores, t)
    print(t, round(tpr/fpr, 2))

### Triggered rules in all setup.py files in random packages

In [None]:
# Select only setup.py files
random_packages_rules_setup_df = random_results_df[random_results_df['target'].str.contains('setup.py')]

In [None]:
# Number of rules per package
random_packages_rules_setup_groupby = random_packages_rules_setup_df.groupby('package')['rule']
print(f"Total number of rules: {random_packages_rules_setup_groupby.count().sum()}")
random_packages_rules_setup_groupby.count().describe()

In [None]:
# Ratio of true positives to false postives 
thresholds = [1, 2, 3, 4, 5]
scores = random_packages_rules_setup_groupby.count().to_list()
thesholds_tpr_fpr_ratio = []
for t in thresholds:
    tp, fp = get_tp_fp(scores, t)
    print(t, tp, fp)

In [None]:
# Saving the existing result
with open(os.path.abspath("../results/oss-detect-backdoor/random-packages.pkl"), 'wb') as fp:
    pickle.dump(random_results, fp, protocol=pickle.HIGHEST_PROTOCOL)