In [12]:
import os
import json
import pandas as pd
from packaging import version

In [13]:
# Function to parse a single JSON file
def parse_json_file(file_path, project):
    parsed_data = []
    
    # Load the JSON file
    with open(file_path, 'r') as file:
        data = json.load(file)
        
        # Iterate over each file entry in the JSON data
        for file_path, file_info in data.get("Files", {}).items():
            # Extract the common details
            path = file_info.get("Path", "")
            sha256 = file_info.get("SHA256", "")
            size = file_info.get("Size", 0)
            risk_score = file_info.get("RiskScore", 0)
            syscalls = file_info.get("Syscalls", [])
            pledges = file_info.get("Pledge", [])
            meta = file_info.get("Meta", {})
            
            # Extract behaviors if available
            behaviors = file_info.get("Behaviors", [])
            for behavior in behaviors:
                behavior_data = {
                    "FilePath": path,
                    "project": project,
                    "apk": path.split('/')[3].split(' ')[0],
                    "version": path.split('/')[3].split(' ')[0].split('-')[-2],
                    "SHA256": sha256,
                    "Size": size,
                    "RiskScore": risk_score,
                    "Syscalls": syscalls,
                    "Pledges": pledges,
                    "Meta": meta,
                    "BehaviorDescription": behavior.get("Description", ""),
                    "MatchStrings": behavior.get("MatchStrings", []),
                    "BehaviorRiskScore": behavior.get("RiskScore", 0),
                    "RiskLevel": behavior.get("RiskLevel", ""),
                    "RuleURL": behavior.get("RuleURL", ""),
                    "ID": behavior.get("ID", ""),
                    "RuleName": behavior.get("RuleName", ""),
                    "ReferenceURL": behavior.get("ReferenceURL", "")
                }
                parsed_data.append(behavior_data)
    
    return parsed_data

In [14]:
# Get the base folder (current directory or script location)
base_folder = os.path.dirname(os.path.dirname(os.getcwd()))

projects = ["0_controller-gen", "1_gobump", "2_logstash-exporter", "3_prometheus-beat-exporter", "4_cosign", "5_step", "6_go-swagger", "7_grafana-agent-operator", "8_terragrunt", "9_litestream"]

# Initialize an empty list to store all parsed data
all_parsed_data = []
for project in projects:
    # Append 'malcontent-results' folder to the base path
    folder_path = os.path.join(base_folder, f'datasets/dataset6_over_time/go/{project}/malcontent-scan')
    # Iterate over each file in the folder and parse it
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".json"):
            file_path = os.path.join(folder_path, file_name)
            parsed_data = parse_json_file(file_path, project)
            all_parsed_data.extend(parsed_data)

# Convert the parsed data into a pandas DataFrame
df = pd.DataFrame(all_parsed_data)

In [15]:
# Sort by project, then version (parsed as a proper version)
df_sorted = df.sort_values(by=['project', 'version'], key=lambda col: col if col.name == 'project' else col.map(version.parse))

In [16]:
def get_new_alerts(prior_apk, current_apk, match_cols):
    prior = df[df['apk']==prior_apk].reset_index(drop=True)
    current = df[df['apk']==current_apk].reset_index(drop=True)

    new_alerts = current.merge(prior[match_cols], 
                               how='left', 
                               indicator=True, on=match_cols).query('_merge == "left_only"').drop(columns='_merge')
    
    match_cols = match_cols + ["MatchStrings"]
    
    if len(new_alerts) > 0:
        temp_alerts = new_alerts[match_cols].copy()
        temp_alerts['prior_apk'] = prior_apk
        temp_alerts['current_apk'] = current_apk
        temp_alerts['new_alerts'] = len(new_alerts)
    else:
        temp_alerts = pd.DataFrame([[None]*len(match_cols) + [prior_apk,
                                                              current_apk,
                                                              len(new_alerts)]], 
                                                              columns=match_cols + ['prior_apk',
                                                                                    'current_apk',
                                                                                    'new_alerts'])
        
    return temp_alerts


In [None]:
# Group by FilePath to calculate statistics for each file
grouped_df = df.groupby('FilePath')
alert_counts = df.groupby(['apk', 'project', 'version'])['RiskLevel'].value_counts().unstack(fill_value=0).reset_index(drop=False)
alert_counts_sort = alert_counts.sort_values(by=['project', 'version'], key=lambda col: col if col.name == 'project' else col.map(version.parse)).reset_index(drop=True)
alert_counts_filter = alert_counts_sort[~alert_counts_sort['apk'].str.contains('.spdx.json')]

# set the prior apk for later use
alert_counts_filter['prior_apk'] = alert_counts_filter.apply(
    lambda row: alert_counts_filter.loc[row.name - 1, 'apk']
    if row.name - 1 >= 0 and row['project'] == alert_counts_filter.loc[row.name - 1, 'project']
    else None,
    axis=1
)

# Calculate deltas only for rows where the APK base (project) is the same as the prior row
alert_counts_filter['LOW_DELTA'] = alert_counts_filter.apply(
    lambda row: row['LOW'] - alert_counts_filter.loc[row.name - 1, 'LOW']
    if row.name - 1 >= 0 and row['project'] == alert_counts_filter.loc[row.name - 1, 'project']
    else None,
    axis=1
)

alert_counts_filter['MEDIUM_DELTA'] = alert_counts_filter.apply(
    lambda row: row['MEDIUM'] - alert_counts_filter.loc[row.name - 1, 'MEDIUM']
    if row.name - 1 >= 0 and row['project'] == alert_counts_filter.loc[row.name - 1, 'project']
    else None,
    axis=1
)

# matching columns for alerts
columns = ['RiskScore', 'BehaviorDescription', 'BehaviorRiskScore', 'RiskLevel', 'RuleURL', 'ID', 'RuleName']

new_alerts = pd.DataFrame()

for index, row in alert_counts_filter.iterrows():
    if row['prior_apk'] != None:
        temp_results = get_new_alerts(prior_apk = row['prior_apk'], 
                    current_apk = row['apk'], 
                    match_cols = columns)
    
        new_alerts = pd.concat([new_alerts, temp_results])

print("wait")