In [1]:
import csv
from collections import defaultdict
import os
import math
from utils.extractor import walk_and_extract_cwe

lang_to_ext = {
    'c': 'c',
    'cpp': 'cpp',
    'python': 'py',
    'java': 'java',
    'javascript': 'js',
    'php': 'php',
    "csharp": "cs"
}

In [2]:
def count_conversations(file_path):
    # Structure: {language: {conversation_hash: count, 'total_unique': count, 'total_occurrences': count}}
    language_data = defaultdict(lambda: {'unique_conversations': set(), 'total_occurrences': 0})

    with open(file_path, mode='r') as csv_file:
        csv_reader = csv.DictReader(csv_file)

        for row in csv_reader:
            conversation_hash = row['conversation_hash']
            language = row['language']

            # Track data per language
            lang_data = language_data[language]

            # Add to unique conversations if not already present
            if conversation_hash not in lang_data['unique_conversations']:
                lang_data['unique_conversations'].add(conversation_hash)

            # Increment total occurrences
            lang_data['total_occurrences'] += 1

    # Convert sets to counts and prepare final output
    result = {}
    for language, data in language_data.items():
        result[language] = {
            'unique_conversations': len(data['unique_conversations']),
            'total_occurrences': data['total_occurrences']
        }

    return result

In [3]:
file_path = 'opengrep_results_deserialization.csv'
language_conversations = count_conversations(file_path)
print(language_conversations)
unique_convo_hash_good = 0
# Print results
print("Unique conversations and total occurrences per language:")
for language, data in language_conversations.items():
    print(f"Language: {language}")
    unique_convo_hash_good += data['unique_conversations']
    print(f"  Unique conversations: {data['unique_conversations']}")
    print(f"  Total occurrences: {data['total_occurrences']}")

{'csharp': {'unique_conversations': 56, 'total_occurrences': 82}, 'java': {'unique_conversations': 273, 'total_occurrences': 634}, 'javascript': {'unique_conversations': 463, 'total_occurrences': 805}, 'python': {'unique_conversations': 2258, 'total_occurrences': 4833}}
Unique conversations and total occurrences per language:
Language: csharp
  Unique conversations: 56
  Total occurrences: 82
Language: java
  Unique conversations: 273
  Total occurrences: 634
Language: javascript
  Unique conversations: 463
  Total occurrences: 805
Language: python
  Unique conversations: 2258
  Total occurrences: 4833


In [10]:
allowed_rules = {
    "java": [
        "insecure-jms-deserialization",
        "object-deserialization",
        "jackson-unsafe-deserialization",
        "insecure-resteasy-deserialization",
        "use-snakeyaml-constructor",
        "xmlinputfactory-external-entities-enabled",
        "xmlinputfactory-possible-xxe",
        "documentbuilderfactory-disallow-doctype-decl-false",
        "documentbuilderfactory-disallow-doctype-decl-missing",
        "documentbuilderfactory-external-general-entities-true",
        "documentbuilderfactory-external-parameter-entities-true",
        "saxparserfactory-disallow-doctype-decl-missing",
        "transformerfactory-dtds-not-disabled",
        "server-dangerous-class-deserialization",
        "server-dangerous-object-deserialization",
    ],
    "csharp": [
        "insecure-binaryformatalter-deseriization",
        "data-contract-resolver",
        "insecure-fastjson-deserialization",
        "insecure-fspickler-deserialization",
        "insecure-typefilterlevel-full",
        "insecure-javascriptserializer-deserialization",
        "insecure-losformatter-deserialization",
        "insecure-netdatacontract-deserialization",
        "insecure-newtonsoft-deserialization",
        "insecure-soapformatter-deserialization",
        "insecure-typefilterlevel-full",
    ],
    "javascript": [
        "express-expat-xxe",
        "express-xml2json-xxe",
        "express-third-party-object-deserialization",
        "grpc-nodejs-insecure-connection",
        "xml2json-xxe",
    ],
    "python": [
        "tainted-pickle-deserialization",
        "avoid-insecure-deserialization",
        "insecure-deserialization",
        "multiprocessing-recv",
        "marshal-usage",
        "avoid-jsonpickle",
        "avoid-pyyaml-load",
        "avoid-unsafe-ruamel",
        
    ]
}

In [5]:
for key, value in allowed_rules.items():
    print(key, len(value))

java 15
csharp 11
javascript 5
python 8


In [6]:
# Flatten allowed rules into a list
allowed_rules_list = [rule for sublist in allowed_rules.values() for rule in sublist]

# Structure: language -> rule -> {count, hashes}
language_rule_results = defaultdict(lambda: defaultdict(lambda: {"count": 0, "hashes": set()}))

with open('opengrep_results.csv', mode='r') as csv_file:
    csv_reader = csv.DictReader(csv_file)
    
    for row in csv_reader:
        error_id = row['error_id'].split('.')[-1]  # Extract last part of error_id
        if error_id in allowed_rules_list:
            language = row['language']  # Make sure this column exists in your CSV
            conversation_hash = row['conversation_hash']
            
            # Update counts for language and rule
            language_rule_results[language][error_id]["count"] += 1
            language_rule_results[language][error_id]["hashes"].add(conversation_hash)

print("Aggregated results by language and rule:")
for language, rules in language_rule_results.items():
    total_unique_hashes = set()
    total_count = 0
    print(f"Language: {language}")
    rule_id_to_cwes = walk_and_extract_cwe(rules)
    for rule, data in rules.items():
        unique_hash_count = len(data["hashes"])
        count = data["count"]
        total_unique_hashes.update(data["hashes"])
        total_count += count
        cwes = rule_id_to_cwes.get(rule, [])
        cwe_str = ", ".join(cwes) if cwes else "N/A"
        print(f"  Rule: {rule}")
        print(f"    CWE(s): {cwe_str}")
        print(f"    Unique conversation hashes: {unique_hash_count}")
        print(f"    Total occurrences: {count}")
        
    num_all_hashes = language_conversations.get(language, {}).get('unique_conversations', 0)
    num_bad_hashes = len(total_unique_hashes)    
    percentage_bad = (num_bad_hashes / num_all_hashes * 100) if num_all_hashes > 0 else 0
    print(f"  Overall unique conversation hashes (language): {len(total_unique_hashes)}")
    print(f"  Overall total occurrences (language): {total_count}")
    print(f"  Percentage of unique conversations that are wrong: {percentage_bad:.2f}%\n")
    print()

Aggregated results by language and rule:
Language: java
  Rule: documentbuilderfactory-disallow-doctype-decl-missing
    CWE(s): CWE-611: Improper Restriction of XML External Entity Reference
    Unique conversation hashes: 3
    Total occurrences: 3
  Rule: object-deserialization
    CWE(s): CWE-502: Deserialization of Untrusted Data
    Unique conversation hashes: 21
    Total occurrences: 33
  Rule: transformerfactory-dtds-not-disabled
    CWE(s): CWE-611: Improper Restriction of XML External Entity Reference
    Unique conversation hashes: 1
    Total occurrences: 1
  Rule: saxparserfactory-disallow-doctype-decl-missing
    CWE(s): CWE-611: Improper Restriction of XML External Entity Reference
    Unique conversation hashes: 5
    Total occurrences: 7
  Rule: use-snakeyaml-constructor
    CWE(s): CWE-502: Deserialization of Untrusted Data
    Unique conversation hashes: 1
    Total occurrences: 3
  Overall unique conversation hashes (language): 30
  Overall total occurrences (langu

## Total statistics

In [7]:
# Flatten the allowed rules into a single list
allowed_rules_list = [rule for sublist in allowed_rules.values() for rule in sublist]
results = defaultdict(lambda: {"count": 0, "hashes": set()})
# Read the CSV file
with open('opengrep_results.csv', mode='r') as csv_file:
    csv_reader = csv.DictReader(csv_file)

    for row in csv_reader:
        error_id = row['error_id'].split('.')[-1]  # Extract the last part of the error_id
        if error_id in allowed_rules_list:
            conversation_hash = row['conversation_hash']
            if error_id in results:
                # Update the count and add the hash to the set
                results[error_id]["count"] += 1
                results[error_id]["hashes"].add(conversation_hash)
            else:
                # Initialize a new entry
                results[error_id] = {"count": 1, "hashes": {conversation_hash}}         
            
unique_convo_hash_bad = 0
for rule in results:
    count = results[rule]["count"]
    unique_hashes = results[rule]["hashes"]
    unique_convo_hash_bad += len(unique_hashes)
print(unique_convo_hash_good, unique_convo_hash_bad)
print(unique_convo_hash_bad / unique_convo_hash_good)


3050 40
0.013114754098360656


In [8]:
def get_code_lines_stats(csv_path, unique_hashes=None, include_only_unique=False):
    if unique_hashes is None:
        unique_hashes = set()

    line_counts = []

    with open(csv_path, newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            convo_hash = row['conversation_hash']

            if include_only_unique:
                # Only process rows whose convo_hash is in unique_hashes
                if convo_hash not in unique_hashes:
                    continue
            else:
                # Exclude rows whose convo_hash is in unique_hashes
                if convo_hash in unique_hashes:
                    continue

            code_index = row['code_index']
            language = row['language'].lower()

            ext = lang_to_ext.get(language)
            if not ext:
                print(f"Unknown language '{language}' for conversation_hash={convo_hash}")
                continue

            filename = f"files/{language}/codes/{convo_hash}_{code_index}.{ext}"

            if not os.path.isfile(filename):
                print(f"File {filename} not found.")
                continue

            with open(filename, 'r', encoding='utf-8', errors='ignore') as code_file:
                lines = code_file.readlines()
                num_lines = len(lines)
                line_counts.append(num_lines)

    if not line_counts:
        return 0.0, 0.0

    avg = sum(line_counts) / len(line_counts)
    variance = sum((x - avg) ** 2 for x in line_counts) / len(line_counts)
    std_dev = math.sqrt(variance)

    return avg, std_dev

good_avg, good_std = get_code_lines_stats(file_path, unique_hashes)
print(f"Good results ||| Avg lines: {good_avg} | Std: {good_std} ")


bad_avg, bad_std = get_code_lines_stats(file_path, unique_hashes, include_only_unique=True)
print(f"Bad results ||| Avg lines: {bad_avg} | Std: {bad_std} ")


Good results ||| Avg lines: 49.200566750629726 | Std: 45.145936880380155 
Bad results ||| Avg lines: 26.0 | Std: 1.0 


## Save the results in a CSV file

In [9]:
with open("results/weak_random_occurrences.csv", mode="w", newline="", encoding="utf-8") as file:
    writer = csv.writer(file)
    writer.writerow(["Rule", "Unique Hash Count", "Hashes"])
    for rule, data in results.items():
        writer.writerow([
            rule,
            len(data["hashes"]),
            ";".join(data["hashes"])
        ])