In [1]:
import os
import tarfile
import ast
import json
import pandas as pd
import math
from collections import Counter
import numpy as np
from scipy.stats import entropy
import logging
import sys
import xml.etree.ElementTree as ET

logstd = logging.StreamHandler(sys.stdout)

logging.basicConfig(
    format="%(asctime)s %(levelname)s %(name)s:%(lineno)d - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S %Z",
    level=logging.INFO,
    handlers=[logstd]
)

log = logging.getLogger()

Count zipped packages to determine useable amount

In [None]:
dataset_1_dir = "/mnt/volume_nyc1_01/Backstabbers-Knife-Collection/samples"
dataset_2_dir = "/mnt/volume_nyc1_01/pypi_malregistry"  

def count_package_files(directory):
    count = 0
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".tar.gz") or file.endswith(".tar.bz2") or file.endswith(".tar.xz"):
                count += 1
    return count

count_1 = count_package_files(dataset_1_dir)
count_2 = count_package_files(dataset_2_dir)

print(f"Number of packages in dataset 1: {count_1}")
print(f"Number of packages in dataset 2: {count_2}")

Count JSON files to determin useable amount

In [None]:
dataset_1_dir = "/mnt/volume_nyc1_01/Backstabbers-Knife-Collection/samples"
dataset_2_dir = "/mnt/volume_nyc1_01/benignPyPI"

def is_valid_json_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            json.load(file)
        return True
    except (ValueError, json.JSONDecodeError):
        return False

def count_valid_json_files(directory):
    count = 0
    for root, _, files in os.walk(directory):
        for file in files:
            if file == "setup.json" and is_valid_json_file(os.path.join(root, file)):
                count += 1
    return count

count_1 = count_valid_json_files(dataset_1_dir)
count_2 = count_valid_json_files(dataset_2_dir)

print(f"Number of valid setup.json files in dataset 1: {count_1}")
print(f"Number of valid setup.json files in dataset 2: {count_2}")

Calculate Shannon Entropy and append to JSON

In [None]:
def find_setup_json_files(directory):
    setup_json_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file == 'setup.json':
                setup_json_files.append(os.path.join(root, file))
    return setup_json_files
    
setup_json_files = find_setup_json_files(dataset_dir)
if setup_json_files:
    print("Found setup.json files:")
    for file in setup_json_files:
        print(file)
else:
    print("No setup.json files found in the specified directory.")


def find_python_files(directory):
    python_files = []
    for entry in os.scandir(directory):
        if entry.is_file() and entry.name.endswith('.py') and not entry.name.startswith('.'):
            python_files.append(entry.path)
        elif entry.is_dir():
            python_files.extend(find_python_files(entry.path))
    return python_files

def shannon_entropy(directory):
    package_entropies = {}
    setup_json_files = find_setup_json_files(directory)
    
    for setup_file_path in setup_json_files:
        package_path = os.path.dirname(setup_file_path)
        package_name = os.path.basename(package_path)
        
        package_entropy = 0
        total_files = 0
        
        python_files = find_python_files(package_path)
        for file_path in python_files:
            try:
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    text = f.read()
                    freqs = np.array(list(Counter(text).values()))
                    probs = freqs / len(text)
                    entropy_value = entropy(probs, base=2)
                    package_entropy += entropy_value
                    total_files += 1
            except Exception as e:
                print(f"Error reading {file_path}: {e}")
        
        if total_files > 0:
            average_entropy = package_entropy / total_files
            package_entropies[package_name] = average_entropy

            try:
                with open(setup_file_path, 'r+', encoding='utf-8', errors='ignore') as setup_file:
                    try:
                        setup_data = json.load(setup_file)
                        setup_data["average_entropy"] = average_entropy
                        setup_file.seek(0)
                        json.dump(setup_data, setup_file, indent=4)
                        setup_file.truncate()
                        print(f"Updated {setup_file_path} with average entropy: {average_entropy}")
                    except json.JSONDecodeError as json_err:
                        print(f"JSON decode error in {setup_file_path}: {json_err}")
            except Exception as e:
                print(f"Error updating {setup_file_path}: {e}")

    return package_entropies

package_entropies = shannon_entropy(dataset_dir)
for package, entropy in package_entropies.items():
    print(f"Shannon entropy of {package}: {entropy}")

Construct AST, store in XML, parse features, append features to JSON

In [None]:
def find_python_files(directory):
    python_files = []
    for entry in os.scandir(directory):
        if entry.is_file() and entry.name.endswith('.py') and not entry.name.startswith('.'):
            python_files.append(entry.path)
        elif entry.is_dir():
            python_files.extend(find_python_files(entry.path))
    return python_files

def construct_ast(file_path):
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        content = f.read()
        try:
            tree = ast.parse(content)
            return tree
        except SyntaxError as e:
            print(f"SyntaxError in {file_path}: {e}")
            return None

def ast_to_xml(node):
    def _convert(node, parent):
        if isinstance(node, ast.AST):
            node_name = node.__class__.__name__
            element = ET.SubElement(parent, node_name)
            for field, value in ast.iter_fields(node):
                field_elem = ET.SubElement(element, field)
                _convert(value, field_elem)
        elif isinstance(node, list):
            for item in node:
                item_elem = ET.SubElement(parent, 'item')
                _convert(item, item_elem)
        else:
            parent.text = str(node)
    root = ET.Element(node.__class__.__name__)
    _convert(node, root)
    return root

dataset_dir = "/mnt/volume_nyc1_01/benignPyPI"

python_files = find_python_files(dataset_dir)

for file in python_files:
    tree = construct_ast(file)
    if tree is not None:
        print(f"Abstract syntax tree of {file}:")
        print(ast.dump(tree, indent=2))
        print()
        xml = ast_to_xml(tree)
        xml_str = ET.tostring(xml, encoding='unicode', method='xml')
        print(f"XML representation of {file}:")
        print(xml_str)

Convert to dataframe with option to save as CSV

In [None]:
def read_json_files(directory):
    # Initialize an empty list to hold the JSON data
    json_data_list = []

    # Walk through the directory
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file == 'setup.json':
                # Construct the full file path
                file_path = os.path.join(root, file)
                
                # Read the JSON file
                with open(file_path, 'r') as f:
                    json_data = json.load(f)
                    json_data_list.append(json_data)

    # Convert the list of JSON data to a DataFrame
    df = pd.DataFrame(json_data_list)
    return df

# Specify the directory containing the packages
directory = '/mnt/volume_nyc1_01/benignPyPI'

# Call the function and get the DataFrame
df = read_json_files(directory)

# Display the DataFrame
df.head()

#save to CSV
df.to_csv('benignPyPI.csv', index=False)

#load df from CSV
df = pd.read_csv('benignPyPI.csv')