In [1]:
from neo4j import GraphDatabase
import random
from collections import defaultdict

# Neo4j connection settings
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "12345678")

# Connect to Neo4j
driver = GraphDatabase.driver(URI, auth=AUTH)

# Select test proteins: labeled proteins to hide their labels for validation
def select_test_proteins(tx, sample_size=100):
    query = """
    MATCH (p:Protein)
    WHERE p.ec_number IS NOT NULL
    RETURN p.entry AS entry, p.ec_number AS ec
    LIMIT $limit
    """
    result = tx.run(query, limit=sample_size)
    return [(record["entry"], record["ec"]) for record in result]

# Temporarily remove EC numbers from test proteins
def remove_labels(tx, test_entries):
    for entry in test_entries:
        tx.run("MATCH (p:Protein {entry: $entry}) SET p.true_ec = p.ec_number, p.ec_number = NULL", entry=entry)

# Restore original EC numbers after testing
def restore_labels(tx):
    tx.run("MATCH (p:Protein) WHERE p.true_ec IS NOT NULL SET p.ec_number = p.true_ec REMOVE p.true_ec")

# Perform label propagation and collect predicted labels
def annotate_protein(tx, entry_id, similarity_threshold=0.2):
    query = """
    MATCH (p:Protein {entry: $entry_id})-[r:SIMILARITY]-(neighbor)
    WHERE r.weight >= $threshold AND neighbor.ec_number IS NOT NULL
    RETURN neighbor.ec_number AS ec, r.weight AS weight
    """
    result = tx.run(query, entry_id=entry_id, threshold=similarity_threshold)
    ec_weights = defaultdict(float)
    for record in result:
        ec_weights[record["ec"]] += record["weight"]
    if ec_weights:
        return max(ec_weights, key=ec_weights.get)
    return None

# Run validation test
with driver.session() as session:
    test_proteins = session.execute_read(select_test_proteins, sample_size=100)
    session.execute_write(remove_labels, [entry for entry, _ in test_proteins])
    
    TP = 0
    P = 0
    T = 0

    for entry, true_ec in test_proteins:
        predicted_ec = session.execute_read(annotate_protein, entry)
        if predicted_ec:
            P += 1
            if predicted_ec == true_ec:
                TP += 1
        if true_ec:
            T += 1

    precision = TP / P if P else 0
    recall = TP / T if T else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0
    coverage = P / len(test_proteins)

    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1 Score: {f1:.2f}")
    print(f"Coverage: {coverage:.2f}")

    session.execute_write(restore_labels)

driver.close()


Precision: 0.91
Recall: 0.86
F1 Score: 0.88
Coverage: 0.95


In [2]:
# Step 3
from neo4j import GraphDatabase

# Neo4j connection settings
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "12345678")

driver = GraphDatabase.driver(URI, auth=AUTH)

# Function to calculate EC label weights and select the best label for an unlabeled protein
def annotate_protein(tx, entry_id, similarity_threshold=0.2):
    # Find neighbors and their EC labels
    query = """
    MATCH (p:Protein {entry: $entry_id})-[r:SIMILARITY]-(neighbor)
    WHERE r.weight >= $threshold AND neighbor.ec_number IS NOT NULL
    RETURN neighbor.ec_number AS ec, r.weight AS weight
    """
    result = tx.run(query, entry_id=entry_id, threshold=similarity_threshold)

    ec_weights = {}

    # Calculate total weights for each EC label
    for record in result:
        ec = record["ec"]
        weight = record["weight"]
        if ec in ec_weights:
            ec_weights[ec] += weight
        else:
            ec_weights[ec] = weight

    # If annotated neighbors exist, choose the label with the highest weight
    if ec_weights:
        best_ec = max(ec_weights, key=ec_weights.get)
        # Update the protein with the selected label
        update_query = """
        MATCH (p:Protein {entry: $entry_id})
        SET p.ec_number = $ec
        RETURN p.entry, p.ec_number
        """
        updated = tx.run(update_query, entry_id=entry_id, ec=best_ec)
        return updated.single()
    else:
        return None

# Use the function with a specific protein
protein_to_annotate = 'A0A024R1X5'  # Replace with the protein entry you want to annotate

with driver.session() as session:
    annotation_result = session.execute_write(annotate_protein, protein_to_annotate)

    if annotation_result:
        print(f"Protein {annotation_result['p.entry']} successfully annotated as {annotation_result['p.ec_number']}")
    else:
        print(f"Not enough data to annotate protein {protein_to_annotate}")

driver.close()


Not enough data to annotate protein A0A024R1X5


In [3]:
from neo4j import GraphDatabase
import random
from collections import defaultdict

# Neo4j connection settings
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "12345678")

# Connect to Neo4j
driver = GraphDatabase.driver(URI, auth=AUTH)

# Select test proteins: labeled proteins to hide their labels for validation
def select_test_proteins(tx, sample_size=100):
    query = """
    MATCH (p:Protein)
    WHERE p.ec_number IS NOT NULL
    RETURN p.entry AS entry, p.ec_number AS ec
    LIMIT $limit
    """
    result = tx.run(query, limit=sample_size)
    return [(record["entry"], record["ec"]) for record in result]

# Temporarily remove EC numbers from test proteins
def remove_labels(tx, test_entries):
    for entry in test_entries:
        tx.run("MATCH (p:Protein {entry: $entry}) SET p.true_ec = p.ec_number, p.ec_number = NULL", entry=entry)

# Restore original EC numbers after testing
def restore_labels(tx):
    tx.run("MATCH (p:Protein) WHERE p.true_ec IS NOT NULL SET p.ec_number = p.true_ec REMOVE p.true_ec")

# Perform label propagation and collect top-N predicted labels
def annotate_protein_multilabel(tx, entry_id, similarity_threshold=0.2, top_n=3):
    query = """
    MATCH (p:Protein {entry: $entry_id})-[r:SIMILARITY]-(neighbor)
    WHERE r.weight >= $threshold AND neighbor.ec_number IS NOT NULL
    RETURN neighbor.ec_number AS ec, r.weight AS weight
    """
    result = tx.run(query, entry_id=entry_id, threshold=similarity_threshold)
    ec_weights = defaultdict(float)
    for record in result:
        ec_weights[record["ec"]] += record["weight"]
    if ec_weights:
        sorted_ecs = sorted(ec_weights.items(), key=lambda x: x[1], reverse=True)
        return [ec for ec, _ in sorted_ecs[:top_n]]
    return []

# Run validation test for multilabel prediction
with driver.session() as session:
    test_proteins = session.execute_read(select_test_proteins, sample_size=100)
    session.execute_write(remove_labels, [entry for entry, _ in test_proteins])

    TP = 0
    P = 0
    T = 0

    for entry, true_ec in test_proteins:
        predicted_ecs = session.execute_read(annotate_protein_multilabel, entry)
        if predicted_ecs:
            P += 1
            if true_ec in predicted_ecs:
                TP += 1
        if true_ec:
            T += 1

    precision = TP / P if P else 0
    recall = TP / T if T else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0
    coverage = P / len(test_proteins)

    print(f"[Multi-label] Precision: {precision:.2f}")
    print(f"[Multi-label] Recall: {recall:.2f}")
    print(f"[Multi-label] F1 Score: {f1:.2f}")
    print(f"[Multi-label] Coverage: {coverage:.2f}")

    session.execute_write(restore_labels)

driver.close()


[Multi-label] Precision: 0.94
[Multi-label] Recall: 0.89
[Multi-label] F1 Score: 0.91
[Multi-label] Coverage: 0.95


In [6]:
from neo4j import GraphDatabase
import random
from collections import defaultdict

# Neo4j connection settings
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "12345678")

# Connect to Neo4j
driver = GraphDatabase.driver(URI, auth=AUTH)

# Select test proteins: labeled proteins to hide their labels for validation
def select_test_proteins(tx, sample_size=100):
    query = """
    MATCH (p:Protein)
    WHERE p.ec_number IS NOT NULL
    RETURN p.entry AS entry, p.ec_number AS ec
    LIMIT $limit
    """
    result = tx.run(query, limit=sample_size)
    return [(record["entry"], record["ec"]) for record in result]

# Temporarily remove EC numbers from test proteins
def remove_labels(tx, test_entries):
    for entry in test_entries:
        tx.run("MATCH (p:Protein {entry: $entry}) SET p.true_ec = p.ec_number, p.ec_number = NULL", entry=entry)

# Restore original EC numbers after testing
def restore_labels(tx):
    tx.run("MATCH (p:Protein) WHERE p.true_ec IS NOT NULL SET p.ec_number = p.true_ec REMOVE p.true_ec")

# Perform label propagation and collect top-N predicted labels or threshold-based labels (READ ONLY)
def annotate_protein_multilabel(tx, entry_id, similarity_threshold=0.2, top_n=3, min_weight=0.0):
    query = """
    MATCH (p:Protein {entry: $entry_id})-[r:SIMILARITY]-(neighbor)
    WHERE r.weight >= $threshold AND neighbor.ec_number IS NOT NULL
    RETURN neighbor.ec_number AS ec, r.weight AS weight
    """
    result = tx.run(query, entry_id=entry_id, threshold=similarity_threshold)
    ec_weights = defaultdict(float)
    for record in result:
        ec_raw = record["ec"]
        weight = record["weight"]
        ec_list = [ec.strip() for ec in ec_raw.split(';')]  # Split multi-labels from neighbor
        for ec in ec_list:
            ec_weights[ec] += weight

    filtered_ecs = [(ec, weight) for ec, weight in ec_weights.items() if weight >= min_weight]
    sorted_ecs = sorted(filtered_ecs, key=lambda x: x[1], reverse=True)
    top_ecs = [ec for ec, _ in sorted_ecs[:top_n]]
    return top_ecs

# Write predictions to Neo4j (WRITE ONLY)
def save_predictions(tx, entry_id, predictions):
    tx.run("MATCH (p:Protein {entry: $entry}) SET p.ec_predictions = $predictions", entry=entry_id, predictions=predictions)

# Run validation test for multilabel prediction with saving predictions in Neo4j
with driver.session() as session:
    test_proteins = session.execute_read(select_test_proteins, sample_size=100)
    session.execute_write(remove_labels, [entry for entry, _ in test_proteins])

    TP = 0
    P = 0
    T = 0

    for entry, true_ec in test_proteins:
        predicted_ecs = session.execute_read(annotate_protein_multilabel, entry, 0.2, 3, 0.1)
        if predicted_ecs:
            P += 1
            if true_ec in predicted_ecs:
                TP += 1
            session.execute_write(save_predictions, entry, predicted_ecs)
        if true_ec:
            T += 1

    precision = TP / P if P else 0
    recall = TP / T if T else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0
    coverage = P / len(test_proteins)

    print(f"[Multi-label] Precision: {precision:.2f}")
    print(f"[Multi-label] Recall: {recall:.2f}")
    print(f"[Multi-label] F1 Score: {f1:.2f}")
    print(f"[Multi-label] Coverage: {coverage:.2f}")

    session.execute_write(restore_labels)

driver.close()


[Multi-label] Precision: 0.84
[Multi-label] Recall: 0.80
[Multi-label] F1 Score: 0.82
[Multi-label] Coverage: 0.95


In [8]:
from neo4j import GraphDatabase
from collections import defaultdict

# Neo4j connection settings
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "12345678")

# Connect to Neo4j
driver = GraphDatabase.driver(URI, auth=AUTH)

# Function to retrieve all proteins with NULL ec_number
def get_unannotated_proteins(tx):
    query = """
    MATCH (p:Protein)
    WHERE p.ec_number IS NULL
    RETURN p.entry AS entry
    """
    result = tx.run(query)
    return [record["entry"] for record in result]

# Perform label propagation for given protein (multi-label)
def annotate_protein_multilabel(tx, entry_id, similarity_threshold=0.2, top_n=3, min_weight=0.0):
    query = """
    MATCH (p:Protein {entry: $entry_id})-[r:SIMILARITY]-(neighbor)
    WHERE r.weight >= $threshold AND neighbor.ec_number IS NOT NULL
    RETURN neighbor.ec_number AS ec, r.weight AS weight
    """
    result = tx.run(query, entry_id=entry_id, threshold=similarity_threshold)
    ec_weights = defaultdict(float)
    for record in result:
        ec_raw = record["ec"]
        weight = record["weight"]
        ec_list = [ec.strip() for ec in ec_raw.split(';')]
        for ec in ec_list:
            ec_weights[ec] += weight

    filtered_ecs = [(ec, weight) for ec, weight in ec_weights.items() if weight >= min_weight]
    sorted_ecs = sorted(filtered_ecs, key=lambda x: x[1], reverse=True)
    top_ecs = [ec for ec, _ in sorted_ecs[:top_n]]
    return top_ecs

# Save predictions to Neo4j
def save_predictions(tx, entry_id, predictions):
    tx.run("MATCH (p:Protein {entry: $entry}) SET p.ec_predictions = $predictions", entry=entry_id, predictions=predictions)

# Run annotation for all unannotated proteins
with driver.session() as session:
    entries = session.execute_read(get_unannotated_proteins)
    print(f"Found {len(entries)} unannotated proteins")

    for entry in entries:
        predicted_ecs = session.execute_read(annotate_protein_multilabel, entry, 0.2, 3, 0.1)
        if predicted_ecs:
            session.execute_write(save_predictions, entry, predicted_ecs)
            print(f"Saved predictions for {entry}: {predicted_ecs}")

driver.close()


Found 3281 unannotated proteins
Saved predictions for A0A024R324: ['3.6.5.2', '3.6.5.-', '3.6.1.-']
Saved predictions for A0A024R7I7: ['3.6.5.2', '3.6.5.-']
Saved predictions for A0A087X0K9: ['2.7.11.1']
Saved predictions for A0A090N7W4: ['2.7.11.1', '2.7.11.22', '2.7.11.30']
Saved predictions for A0A0C4DGF1: ['3.6.1.-']
Saved predictions for A0A0K2S4Q6: ['3.2.2.6', '3.4.24.-']
Saved predictions for A0A0S2Z3A9: ['2.4.1.255', '2.3.2.27']
Saved predictions for A0A0S2Z3W6: ['7.2.2.13', '7.2.2.10', '7.6.2.1']
Saved predictions for A0A0S2Z4Z8: ['2.7.11.1', '2.7.10.2', '2.7.12.2']
Saved predictions for A0A0S2Z5X4: ['2.3.2.-']
Saved predictions for A0A140VK09: ['5.2.1.8', '1.1.5.3', '3.1.3.16']
Saved predictions for A0A140VKC4: ['3.2.2.-']
Saved predictions for A0A384P5C6: ['3.4.24.19']
Saved predictions for A0A7I2PJA1: ['2.3.2.-']
Saved predictions for A0A7I2R3P8: ['2.3.1.-', '2.7.11.1']
Saved predictions for A0A8I5KZC4: ['2.4.2.-', '2.4.2.30']
Saved predictions for A0A8Q3SHM6: ['3.1.1.13', 