## Prep. of CIVIC Data

### This notebook contains the extraction and processing of data from CIVIC raw files.

## Imports and Setup

Import core libraries used in this notebook.

In [1]:
import pandas as pd
import json
import random
from pathlib import Path
from typing import List, Dict, Any

# Define paths (currently relative, adjust as needed)
RAW_DIR = Path('./raw')
CLEAN_DIR = Path('./clean')
CONTEXT_DIR = CLEAN_DIR / 'context'

# Ensure clean directory exists
CLEAN_DIR.mkdir(exist_ok=True)

## Helper Functions

Used later on to speed-up the process, each function has its own dosctring for easier understanding and usage.

In [2]:
def read_tsv(filename: str) -> pd.DataFrame:
    """
    Read a TSV file and return a DataFrame.

    Args:
    filename (str): Name of the file to read.

    Returns:
    pd.DataFrame: DataFrame containing the file contents.
    """
    return pd.read_csv(RAW_DIR / filename, sep='\t')

def process_assertion(row: pd.Series, clinical_evidence: pd.DataFrame) -> Dict[str, Any]:
    """
    Process a single assertion row and return a dictionary with extracted information.

    Args:
    row (pd.Series): A row from the assertion DataFrame.
    clinical_evidence (pd.DataFrame): DataFrame containing clinical evidence summaries.

    Returns:
    Dict[str, Any]: Processed assertion data.
    """
    evidence_ids = row['evidence_item_ids'].split(',')
    evidence_list = []
    
    for eid in evidence_ids:
        evidence = clinical_evidence[clinical_evidence['evidence_id'] == int(eid)]
        if not evidence.empty:
            evidence_list.append({
                'evidence_id': int(eid),
                'description': evidence['evidence_statement'].iloc[0]
            })

    return {
        'claim': row['assertion_summary'],
        'explanation': row['assertion_description'],
        'evidence': evidence_list
    }

def save_json(data: Any, filename: str):
    """
    Save data as a JSON file in the clean directory.

    Args:
    data (Any): Data to be saved.
    filename (str): Name of the file to save.
    """
    with open(CLEAN_DIR / filename, 'w') as f:
        json.dump(data, f, indent=2)

def create_missing_evidence_file(baseline_data: List[Dict[str, Any]], deletion_probability: float = 0.3) -> List[Dict[str, Any]]:
    """
    Create a new dataset with some evidence items randomly deleted from each entry.

    Args:
    baseline_data (List[Dict[str, Any]]): The original processed assertions data.
    deletion_probability (float): Probability of deleting an evidence item (default: 0.3).

    Returns:
    List[Dict[str, Any]]: New dataset with some evidence items moved to 'missing_evidence'.
    """
    missing_evidence_data = []

    for entry in baseline_data:
        new_entry = entry.copy()
        original_evidence = new_entry['evidence']
        new_evidence = []
        missing_evidence = []

        # Determine number of items to potentially delete
        num_evidence = len(original_evidence)
        if num_evidence > 1:
            max_to_delete = random.randint(1, num_evidence - 1)
        else:
            max_to_delete = 0

        # Randomly select items to keep or delete
        for item in original_evidence:
            if len(missing_evidence) < max_to_delete and random.random() < deletion_probability:
                missing_evidence.append(item)
            else:
                new_evidence.append(item)

        # Ensure at least one item is deleted if possible
        if len(missing_evidence) == 0 and len(new_evidence) > 1:
            item_to_move = random.choice(new_evidence)
            new_evidence.remove(item_to_move)
            missing_evidence.append(item_to_move)

        new_entry['evidence'] = new_evidence
        new_entry['missing_evidence'] = missing_evidence
        missing_evidence_data.append(new_entry)

    return missing_evidence_data

def calculate_wrong_evidence_count(original_count):
    """
    Calculate the number of wrong evidence items to add.
    
    Args:
    original_count (int): The count of original evidence items.
    
    Returns:
    int: The number of wrong evidence items to add.
    """
    base_count = max(1, min(3, original_count // 3))  # At least 1, at most 3
    return base_count + (1 if random.random() < 0.3 else 0)  # 30% chance of adding one more

def create_wrong_evidence(evidence_kb: List[Dict[str, Any]], exclude_ids: List[int]) -> Dict[str, Any]:
    """
    Create a wrong evidence item that is not in the exclude_ids list.
    
    Args:
    evidence_kb (List[Dict[str, Any]]): The knowledge base of all evidence items.
    exclude_ids (List[int]): List of evidence IDs to exclude.
    
    Returns:
    Dict[str, Any]: A wrong evidence item.
    """
    while True:
        wrong_evidence = random.choice(evidence_kb)
        if wrong_evidence['evidence_id'] not in exclude_ids:
            return wrong_evidence

def create_wrong_evidence_file(baseline_data: List[Dict[str, Any]], evidence_kb: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Create a new dataset with wrong evidence items added to each entry.
    
    Args:
    baseline_data (List[Dict[str, Any]]): The original processed assertions data.
    evidence_kb (List[Dict[str, Any]]): The knowledge base of all evidence items.
    
    Returns:
    List[Dict[str, Any]]: New dataset with wrong evidence items added.
    """
    wrong_evidence_data = []

    for entry in baseline_data:
        new_entry = entry.copy()
        original_evidence = new_entry['evidence']
        original_ids = [item['evidence_id'] for item in original_evidence]
        
        wrong_evidence_count = calculate_wrong_evidence_count(len(original_evidence))
        wrong_evidence = [create_wrong_evidence(evidence_kb, original_ids) for _ in range(wrong_evidence_count)]
        
        new_entry['wrong_evidence'] = wrong_evidence
        wrong_evidence_data.append(new_entry)

    return wrong_evidence_data

def create_mixed_evidence_file(baseline_data: List[Dict[str, Any]], evidence_kb: List[Dict[str, Any]], deletion_probability: float = 0.5) -> List[Dict[str, Any]]:
    """
    Create a new dataset with some evidence items randomly deleted and wrong evidence items added to each entry.

    Args:
    baseline_data (List[Dict[str, Any]]): The original processed assertions data.
    evidence_kb (List[Dict[str, Any]]): The knowledge base of all evidence items.
    deletion_probability (float): Probability of deleting an evidence item (default: 0.5).

    Returns:
    List[Dict[str, Any]]: New dataset with modified evidence.
    """
    mixed_evidence_data = []

    for entry in baseline_data:
        new_entry = entry.copy()
        original_evidence = new_entry['evidence']
        new_evidence = []
        missing_evidence = []

        # Step 1: Delete some evidence (similar to missing_evidence logic)
        num_evidence = len(original_evidence)
        if num_evidence > 1:
            max_to_delete = random.randint(1, num_evidence - 1)
        else:
            max_to_delete = 0

        for item in original_evidence:
            if len(missing_evidence) < max_to_delete and random.random() < deletion_probability:
                missing_evidence.append(item)
            else:
                new_evidence.append(item)

        # Ensure at least one item is deleted if possible
        if len(missing_evidence) == 0 and len(new_evidence) > 1:
            item_to_move = random.choice(new_evidence)
            new_evidence.remove(item_to_move)
            missing_evidence.append(item_to_move)

        # Step 2: Add wrong evidence (similar to wrong_evidence logic)
        original_ids = [item['evidence_id'] for item in original_evidence]
        wrong_evidence_count = calculate_wrong_evidence_count(len(original_evidence))
        wrong_evidence = [create_wrong_evidence(evidence_kb, original_ids) for _ in range(wrong_evidence_count)]

        # Update the entry
        new_entry['evidence'] = new_evidence
        new_entry['missing_evidence'] = missing_evidence
        new_entry['wrong_evidence'] = wrong_evidence
        mixed_evidence_data.append(new_entry)

    return mixed_evidence_data

def create_wrong_claim_file(baseline_data: List[Dict[str, Any]], evidence_kb: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Create a new dataset with 50 randomly selected claims, each assigned wrong evidence.

    Args:
    baseline_data (List[Dict[str, Any]]): The original processed assertions data.
    evidence_kb (List[Dict[str, Any]]): The knowledge base of all evidence items.

    Returns:
    List[Dict[str, Any]]: New dataset with wrong claims and evidence.
    """
    wrong_claim_data = []
    selected_claims = random.sample(baseline_data, 50)

    for entry in selected_claims:
        new_entry = {
            'claim': entry['claim'],
            'explanation': entry['explanation']
        }
        
        original_ids = [item['evidence_id'] for item in entry['evidence']]
        wrong_evidence_count = random.randint(3, 7)  # Random number of wrong evidence items (3 to 7)
        
        wrong_evidence = []
        while len(wrong_evidence) < wrong_evidence_count:
            candidate = random.choice(evidence_kb)
            if candidate['evidence_id'] not in original_ids and candidate not in wrong_evidence:
                wrong_evidence.append(candidate)
        
        new_entry['evidence'] = wrong_evidence
        wrong_claim_data.append(new_entry)

    return wrong_claim_data

def create_assignment_test_file(baseline_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Create a new dataset with 50 examples, each containing two random claims and evidence from a third claim.

    Args:
    baseline_data (List[Dict[str, Any]]): The original processed assertions data.

    Returns:
    List[Dict[str, Any]]: New dataset for assignment testing.
    """
    assignment_test_data = []

    for _ in range(50):
        claims = random.sample(baseline_data, 3)
        
        example = {
            'claim_A': claims[0]['claim'],
            'claim_B': claims[1]['claim'],
            'explanation_A': claims[0]['explanation'],
            'explanation_B': claims[1]['explanation'],
            'evidence_A': claims[0]['evidence'],
            'evidence_B': claims[1]['evidence'],
            'evidence_C': claims[2]['evidence']
        }
        
        assignment_test_data.append(example)

    return assignment_test_data

def get_molecular_profile_summary(molecular_profiles, molecular_profile_id):
    """
    Get the summary for a given molecular profile ID.

    Args:
    molecular_profiles (pd.DataFrame): DataFrame containing molecular profile data.
    molecular_profile_id (int): ID of the molecular profile.

    Returns:
    str: Summary of the molecular profile, or empty string if not found.
    """
    matching_profiles = molecular_profiles[
        molecular_profiles['molecular_profile_id'] == molecular_profile_id
    ]
    if matching_profiles.empty:
        return ""
    summary = matching_profiles['summary'].iloc[0]
    return summary if not pd.isna(summary) else ""

def create_context(row, molecular_profiles):
    """
    Create a context dictionary from a row of assertion data.

    Args:
    row (pd.Series): A row from the assertions DataFrame.
    molecular_profiles (pd.DataFrame): DataFrame containing molecular profile data.

    Returns:
    dict: Context information for the assertion.
    """
    return {
        "Molecular Profile": row.get('molecular_profile', ""),
        "Molecular Profile Summary": get_molecular_profile_summary(
            molecular_profiles, row.get('molecular_profile_id')
        ),
        "Disease": row.get('disease', ""),
        "Therapies": (", ".join(row['therapies'].split(','))
                      if pd.notna(row.get('therapies')) else ""),
        "Phenotypes": (", ".join(row['phenotypes'].split(','))
                       if pd.notna(row.get('phenotypes')) else "")
    }


def add_context_to_file(assertions, molecular_profiles, input_file, output_file):
    """
    Add context to each item in the input file and save to the output file.

    Args:
    assertions (pd.DataFrame): DataFrame containing assertion data.
    molecular_profiles (pd.DataFrame): DataFrame containing molecular profile data.
    input_file (str): Path to the input JSON file.
    output_file (str): Path to save the output JSON file.
    """
    with open(input_file, 'r') as f:
        data = json.load(f)
    
    for item in data:
        assertion_row = assertions[
            assertions['assertion_summary'] == item['claim']
        ].iloc[0]
        item['context'] = create_context(assertion_row, molecular_profiles)
    
    with open(output_file, 'w') as f:
        json.dump(data, f, indent=2)

def create_selection_test_file(baseline_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Create a new dataset with 50 examples, each containing two random claims and one evidence set with explanation.

    Args:
    baseline_data (List[Dict[str, Any]]): The original processed assertions data.

    Returns:
    List[Dict[str, Any]]: New dataset for selection testing.
    """
    selection_test_data = []

    for _ in range(50):
        claims = random.sample(baseline_data, 2)
        
        example = {
            'claim_A': claims[0]['claim'],
            'claim_B': claims[1]['claim'],
            'explanation_A': claims[0]['explanation'],
            'evidence_A': claims[0]['evidence']
        }
        
        selection_test_data.append(example)

    return selection_test_data

# Create and Save Files

## Baseline Files

In [3]:
# Read necessary files
assertions = read_tsv('01-Aug-2024-AssertionSummaries.tsv')
clinical_evidence = read_tsv('01-Aug-2024-ClinicalEvidenceSummaries.tsv')
molecular_profiles = read_tsv('01-Aug-2024-MolecularProfileSummaries.tsv')

# Process assertions
processed_assertions = []
unique_evidence_items = {}

for _, row in assertions.iterrows():
    processed_assertion = process_assertion(row, clinical_evidence)
    processed_assertions.append(processed_assertion)

    # Collect unique evidence items
    for evidence in processed_assertion['evidence']:
        unique_evidence_items[evidence['evidence_id']] = {
            'evidence_id': evidence['evidence_id'],
            'description': evidence['description']
        }

# Save processed assertions
save_json(processed_assertions, 'no_context/baseline.json')

# Save unique evidence items
save_json(list(unique_evidence_items.values()), 'evidence_kb.json')

print("Data extraction completed. Files saved in the selected paths.")

Data extraction completed. Files saved in the selected paths.


## Missing Evidence File

In [4]:
# Load the baseline data
with open(CLEAN_DIR / 'no_context/baseline.json', 'r') as f:
    baseline_data = json.load(f)

# Create the missing evidence dataset
missing_evidence_data = create_missing_evidence_file(baseline_data)

# Save the missing evidence dataset
save_json(missing_evidence_data, 'no_context/missing_evidence.json')

print("Missing evidence file created and saved as 'missing_evidence.json'")

Missing evidence file created and saved as 'missing_evidence.json'


## Wrong Evidence File

In [5]:
# Load the evidence knowledge base
with open(CLEAN_DIR / 'evidence_kb.json', 'r') as f:
    evidence_kb = json.load(f)

# Create the wrong evidence dataset
wrong_evidence_data = create_wrong_evidence_file(baseline_data, evidence_kb)

# Save the wrong evidence dataset
save_json(wrong_evidence_data, 'no_context/wrong_evidence.json')

print("Wrong evidence file created and saved as 'wrong_evidence.json'")

Wrong evidence file created and saved as 'wrong_evidence.json'


## Mixed File

In [6]:
# Create the mixed evidence dataset
mixed_evidence_data = create_mixed_evidence_file(baseline_data, evidence_kb)

# Save the mixed evidence dataset
save_json(mixed_evidence_data, 'no_context/mixed.json')

print("Mixed evidence file created and saved as 'mixed.json'")

Mixed evidence file created and saved as 'mixed.json'


## Wrong Claim File

In [7]:
# Create and save wrong claim file
wrong_claim_data = create_wrong_claim_file(baseline_data, evidence_kb)
save_json(wrong_claim_data, 'no_context/wrong_claim.json')

print("Wrong claim file created and saved as 'wrong_claim.json'")

Wrong claim file created and saved as 'wrong_claim.json'


## Assignment Test File

In [8]:
# Create and save assignment test file
assignment_test_data = create_assignment_test_file(baseline_data)
save_json(assignment_test_data, 'no_context/assignment_test.json')

print("Assignment test file created and saved as 'assignment_test.json'")


Assignment test file created and saved as 'assignment_test.json'


## Selection Test

In [9]:
# Create and save selection test file
selection_test_data = create_selection_test_file(baseline_data)
save_json(selection_test_data, 'no_context/selection_test.json')

print("Selection test file created and saved as 'selection_test.json'")

Selection test file created and saved as 'selection_test.json'


## Adding Context to All Files

In [10]:
# List of files to process
files_to_process = [
    'baseline.json',
    'missing_evidence.json',
    'wrong_evidence.json',
    'mixed.json',
    'wrong_claim.json'
]

# Process each file
for file in files_to_process:
    input_file = CLEAN_DIR / 'no_context' / file
    output_file = CONTEXT_DIR / file
    add_context_to_file(assertions, molecular_profiles, input_file, output_file)
    print(f"Created context version of {file}")

# Special handling for assignment_test.json
with open(CLEAN_DIR / 'no_context' / 'assignment_test.json', 'r') as f:
    assignment_test_data = json.load(f)

for item in assignment_test_data:
    assertion_row_A = assertions[
        assertions['assertion_summary'] == item['claim_A']
    ].iloc[0]
    assertion_row_B = assertions[
        assertions['assertion_summary'] == item['claim_B']
    ].iloc[0]
    item['context_A'] = create_context(assertion_row_A, molecular_profiles)
    item['context_B'] = create_context(assertion_row_B, molecular_profiles)

with open(CONTEXT_DIR / 'assignment_test.json', 'w') as f:
    json.dump(assignment_test_data, f, indent=2)

print("Created context version of assignment_test.json")

# Special handling for selection_test.json
with open(CLEAN_DIR / 'no_context' / 'selection_test.json', 'r') as f:
    selection_test_data = json.load(f)

for item in selection_test_data:
    assertion_row_A = assertions[
        assertions['assertion_summary'] == item['claim_A']
    ].iloc[0]
    assertion_row_B = assertions[
        assertions['assertion_summary'] == item['claim_B']
    ].iloc[0]
    item['context_A'] = create_context(assertion_row_A, molecular_profiles)
    item['context_B'] = create_context(assertion_row_B, molecular_profiles)

with open(CONTEXT_DIR / 'selection_test.json', 'w') as f:
    json.dump(selection_test_data, f, indent=2)

print("Created context version of selection_test.json")

print("All context files have been created in the 'context' folder.")

Created context version of baseline.json
Created context version of missing_evidence.json
Created context version of wrong_evidence.json
Created context version of mixed.json
Created context version of wrong_claim.json
Created context version of assignment_test.json
Created context version of selection_test.json
All context files have been created in the 'context' folder.
All context files have been created in the 'context' folder.
