In [1]:
import torch
import pytest
import numpy as np


def filter_and_remap_edges(adj_matrix, keep_mask):
    """Improved version of the filter function"""
    keep_mask = keep_mask.bool()
    adj_matrix = adj_matrix.coalesce()
    
    valid_indices = torch.where(keep_mask)[0]
    num_valid_nodes = len(valid_indices)
    
    mapping = torch.full((len(keep_mask),), -1, dtype=torch.long, device=keep_mask.device)
    mapping[valid_indices] = torch.arange(num_valid_nodes, device=keep_mask.device)
    
    adj_indices = adj_matrix.indices()
    adj_values = adj_matrix.values()
    
    valid_edges_mask = keep_mask[adj_indices[0]] & keep_mask[adj_indices[1]]
    
    if valid_edges_mask.sum() == 0:
        filtered_indices = torch.empty((2, 0), dtype=torch.long, device=keep_mask.device)
        filtered_values = torch.empty(0, dtype=adj_values.dtype, device=keep_mask.device)
    else:
        filtered_indices = adj_indices[:, valid_edges_mask]
        filtered_values = adj_values[valid_edges_mask]
        filtered_indices[0] = mapping[filtered_indices[0]]
        filtered_indices[1] = mapping[filtered_indices[1]]
    
    filtered_adj = torch.sparse_coo_tensor(
        indices=filtered_indices,
        values=filtered_values,
        size=(num_valid_nodes, num_valid_nodes),
        dtype=adj_matrix.dtype,
        device=adj_matrix.device
    )
    
    return filtered_adj


class TestFilterAndRemapEdges:
    
    def test_basic_functionality(self):
        """Test basic filtering and remapping"""
        # Create a 4x4 adjacency matrix
        indices = torch.tensor([[0, 1, 2, 3, 1], [1, 0, 3, 2, 2]])
        values = torch.tensor([1.0, 2.0, 3.0, 4.0, 5.0])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (4, 4))
        
        # Keep nodes 0, 2, 3 (filter out node 1)
        keep_mask = torch.tensor([True, False, True, True])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        # Expected: nodes 0->1, 2->3, 3->2 become 0->?, 1->2, 2->1
        # But edges involving node 1 should be filtered out
        # So we should have: 2->3 becomes 1->2, and 3->2 becomes 2->1
        
        assert result.size() == (3, 3)  # 3 remaining nodes
        result_dense = result.to_dense()
        
        # Verify specific edges exist
        assert result_dense[1, 2] == 3.0  # edge 2->3 mapped to 1->2
        assert result_dense[2, 1] == 4.0  # edge 3->2 mapped to 2->1
        
    def test_keep_all_nodes(self):
        """Test when all nodes are kept"""
        indices = torch.tensor([[0, 1, 2], [1, 2, 0]])
        values = torch.tensor([1.0, 2.0, 3.0])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (3, 3))
        
        keep_mask = torch.tensor([True, True, True])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        # Should be identical to original
        assert result.size() == (3, 3)
        assert torch.allclose(result.to_dense(), adj_matrix.to_dense())
        
    def test_keep_no_nodes(self):
        """Test when no nodes are kept"""
        indices = torch.tensor([[0, 1], [1, 0]])
        values = torch.tensor([1.0, 2.0])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (2, 2))
        
        keep_mask = torch.tensor([False, False])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        assert result.size() == (0, 0)
        assert result._nnz() == 0
        
    def test_no_edges(self):
        """Test with no edges in the adjacency matrix"""
        indices = torch.empty((2, 0), dtype=torch.long)
        values = torch.empty(0, dtype=torch.float)
        adj_matrix = torch.sparse_coo_tensor(indices, values, (4, 4))
        
        keep_mask = torch.tensor([True, False, True, True])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        assert result.size() == (3, 3)
        assert result._nnz() == 0
        
    def test_single_node(self):
        """Test with single node"""
        indices = torch.tensor([[0], [0]])
        values = torch.tensor([5.0])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (1, 1))
        
        keep_mask = torch.tensor([True])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        assert result.size() == (1, 1)
        assert result.to_dense()[0, 0] == 5.0
        
    def test_single_node_filtered_out(self):
        """Test with single node that gets filtered out"""
        indices = torch.tensor([[0], [0]])
        values = torch.tensor([5.0])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (1, 1))
        
        keep_mask = torch.tensor([False])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        assert result.size() == (0, 0)
        assert result._nnz() == 0
        
    def test_edge_values_preserved(self):
        """Test that edge values are correctly preserved"""
        indices = torch.tensor([[0, 1, 2], [2, 0, 1]])
        values = torch.tensor([10.5, 20.3, 30.7])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (3, 3))
        
        keep_mask = torch.tensor([True, True, True])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        # Check that values are preserved
        result_coo = result.coalesce()
        result_values = result_coo.values()
        
        # Sort both to compare (order might change)
        orig_sorted = torch.sort(values)[0]
        result_sorted = torch.sort(result_values)[0]
        
        assert torch.allclose(orig_sorted, result_sorted)
        
    def test_different_dtypes(self):
        """Test with different data types"""
        indices = torch.tensor([[0, 1], [1, 0]])
        
        # Test int values
        values_int = torch.tensor([1, 2], dtype=torch.int32)
        adj_matrix_int = torch.sparse_coo_tensor(indices, values_int, (2, 2))
        keep_mask = torch.tensor([True, True])
        
        result_int = filter_and_remap_edges(adj_matrix_int, keep_mask)
        assert result_int.dtype == torch.int32
        
        # Test double values
        values_double = torch.tensor([1.0, 2.0], dtype=torch.float64)
        adj_matrix_double = torch.sparse_coo_tensor(indices, values_double, (2, 2))
        
        result_double = filter_and_remap_edges(adj_matrix_double, keep_mask)
        assert result_double.dtype == torch.float64
        
    def test_device_consistency(self):
        """Test that device is preserved"""
        indices = torch.tensor([[0, 1], [1, 0]])
        values = torch.tensor([1.0, 2.0])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (2, 2))
        keep_mask = torch.tensor([True, False])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        assert result.device == adj_matrix.device
        assert result.device == keep_mask.device
        
    def test_complex_mapping(self):
        """Test more complex node mapping scenario"""
        # 5 nodes, keep nodes 1, 3, 4 (indices 0, 2, 3, 4 get filtered out)
        indices = torch.tensor([[0, 1, 2, 3, 4, 1, 3], [1, 3, 1, 4, 2, 4, 1]])
        values = torch.tensor([1., 2., 3., 4., 5., 6., 7.])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (5, 5))
        
        keep_mask = torch.tensor([False, True, False, True, True])
        
        result = filter_and_remap_edges(adj_matrix, keep_mask)
        
        # Remaining nodes: 1, 3, 4 -> mapped to 0, 1, 2
        # Edges that should remain:
        # 1->3 (value 2) -> 0->1
        # 3->4 (value 4) -> 1->2  
        # 1->4 (value 6) -> 0->2
        # 3->1 (value 7) -> 1->0
        
        assert result.size() == (3, 3)
        result_dense = result.to_dense()
        
        assert result_dense[0, 1] == 2.0  # 1->3 became 0->1
        assert result_dense[1, 2] == 4.0  # 3->4 became 1->2
        assert result_dense[0, 2] == 6.0  # 1->4 became 0->2
        assert result_dense[1, 0] == 7.0  # 3->1 became 1->0
        
    def test_input_validation(self):
        """Test input edge cases and validation"""
        indices = torch.tensor([[0, 1], [1, 0]])
        values = torch.tensor([1.0, 2.0])
        adj_matrix = torch.sparse_coo_tensor(indices, values, (2, 2))
        
        # Test with non-boolean mask (should be converted)
        keep_mask_int = torch.tensor([1, 0], dtype=torch.int)
        result = filter_and_remap_edges(adj_matrix, keep_mask_int)
        assert result.size() == (1, 1)
        
    def run_all_tests(self):
        """Run all tests"""
        test_methods = [method for method in dir(self) if method.startswith('test_')]
        
        for test_method in test_methods:
            print(f"Running {test_method}...")
            try:
                getattr(self, test_method)()
                print(f"✓ {test_method} passed")
            except Exception as e:
                print(f"✗ {test_method} failed: {e}")
                raise


# Usage example:
if __name__ == "__main__":
    # Run all tests
    tester = TestFilterAndRemapEdges()
    tester.run_all_tests()
    print("\nAll tests completed!")
    
    # Example of individual test
    print("\n" + "="*50)
    print("EXAMPLE USAGE:")
    print("="*50)
    
    # Create sample data
    indices = torch.tensor([[0, 1, 2, 3, 1], [1, 0, 3, 2, 2]])
    values = torch.tensor([1.0, 2.0, 3.0, 4.0, 5.0])
    adj_matrix = torch.sparse_coo_tensor(indices, values, (4, 4))
    
    print("Original adjacency matrix:")
    print(adj_matrix.to_dense())
    
    keep_mask = torch.tensor([True, False, True, True])
    print(f"\nKeep mask: {keep_mask}")
    print(f"Keeping nodes: {torch.where(keep_mask)[0].tolist()}")
    
    result = filter_and_remap_edges(adj_matrix, keep_mask)
    print(f"\nFiltered adjacency matrix (size {result.size()}):")
    print(result.to_dense())

Running test_basic_functionality...
✓ test_basic_functionality passed
Running test_complex_mapping...
✓ test_complex_mapping passed
Running test_device_consistency...
✓ test_device_consistency passed
Running test_different_dtypes...
✓ test_different_dtypes passed
Running test_edge_values_preserved...
✓ test_edge_values_preserved passed
Running test_input_validation...
✓ test_input_validation passed
Running test_keep_all_nodes...
✓ test_keep_all_nodes passed
Running test_keep_no_nodes...
✓ test_keep_no_nodes passed
Running test_no_edges...
✓ test_no_edges passed
Running test_single_node...
✓ test_single_node passed
Running test_single_node_filtered_out...
✓ test_single_node_filtered_out passed

All tests completed!

EXAMPLE USAGE:
Original adjacency matrix:
tensor([[0., 1., 0., 0.],
        [2., 0., 5., 0.],
        [0., 0., 0., 3.],
        [0., 0., 4., 0.]])

Keep mask: tensor([ True, False,  True,  True])
Keeping nodes: [0, 2, 3]

Filtered adjacency matrix (size torch.Size([3, 3])):


In [None]:
import torch
from collections import defaultdict

def create_frequency_weighted_adj(rel_edge_index):
    """Original function from the user"""
    num_nodes = rel_edge_index.max().item() + 1
    # Count occurrences of each edge
    edge_counts = defaultdict(int)
    
    for i in range(rel_edge_index.size(1)):
        src = rel_edge_index[0, i].item()
        dst = rel_edge_index[1, i].item()
        edge_counts[(src, dst)] += 1
    
    # Create indices and values for sparse tensor
    indices = []
    values = []
    
    for (src, dst), count in edge_counts.items():
        indices.append([src, dst])
        values.append(float(count))
    
    if indices:
        indices = torch.tensor(indices).t()  # Shape: [2, num_unique_edges]
        values = torch.tensor(values)
    else:
        indices = torch.empty((2, 0), dtype=torch.long)
        values = torch.empty(0)
    
    adj = torch.sparse_coo_tensor(
        indices=indices,
        values=values,
        size=(num_nodes, num_nodes)
    )
    
    return adj

def filter_and_remap_edges_improved(adj_matrix, keep_mask):
    """Improved version of the filter function"""
    keep_mask = keep_mask.bool()
    adj_matrix = adj_matrix.coalesce()
    
    valid_indices = torch.where(keep_mask)[0]
    num_valid_nodes = len(valid_indices)
    
    mapping = torch.full((len(keep_mask),), -1, dtype=torch.long, device=keep_mask.device)
    mapping[valid_indices] = torch.arange(num_valid_nodes, device=keep_mask.device)
    
    adj_indices = adj_matrix.indices()
    adj_values = adj_matrix.values()
    
    valid_edges_mask = keep_mask[adj_indices[0]] & keep_mask[adj_indices[1]]
    
    if valid_edges_mask.sum() == 0:
        filtered_indices = torch.empty((2, 0), dtype=torch.long, device=keep_mask.device)
        filtered_values = torch.empty(0, dtype=adj_values.dtype, device=keep_mask.device)
    else:
        filtered_indices = adj_indices[:, valid_edges_mask]
        filtered_values = adj_values[valid_edges_mask]
        filtered_indices[0] = mapping[filtered_indices[0]]
        filtered_indices[1] = mapping[filtered_indices[1]]
    
    filtered_adj = torch.sparse_coo_tensor(
        indices=filtered_indices,
        values=filtered_values,
        size=(num_valid_nodes, num_valid_nodes),
        dtype=adj_matrix.dtype,
        device=adj_matrix.device
    )
    
    return filtered_adj

def print_sparse_matrix_details(matrix, name):
    """Helper function to print sparse matrix information"""
    # Coalesce the matrix first to access indices safely
    matrix = matrix.coalesce()
    
    print(f"\n{name}:")
    print(f"  Shape: {matrix.shape}")
    print(f"  Number of edges: {matrix._nnz()}")
    if matrix._nnz() > 0:
        print(f"  Indices:\n{matrix.indices()}")
        print(f"  Values: {matrix.values()}")
    print(f"  Dense representation:\n{matrix.to_dense()}")

def example_1_basic_filtering():
    """
    Example 1: Basic filtering with a small graph
    Tests normal case with some nodes kept and some removed
    """
    print("="*60)
    print("EXAMPLE 1: Basic Filtering Test")
    print("="*60)
    
    # Create edge list: 0->1, 1->2, 2->0, 1->0, 0->0 (self-loop), 1->2 (duplicate)
    rel_edge_index = torch.tensor([
        [0, 1, 2, 1, 0, 1],  # source nodes
        [1, 2, 0, 0, 0, 2]   # destination nodes
    ])
    
    print("Original edge list:")
    print("Sources:     ", rel_edge_index[0].tolist())
    print("Destinations:", rel_edge_index[1].tolist())
    print("Edges: 0->1, 1->2, 2->0, 1->0, 0->0 (self-loop), 1->2 (duplicate)")
    
    # Create adjacency matrix using the original function
    adj_matrix = create_frequency_weighted_adj(rel_edge_index)
    print_sparse_matrix_details(adj_matrix, "Original Adjacency Matrix")
    
    # Test case: Keep nodes 0 and 2, remove node 1
    keep_mask = torch.tensor([True, False, True])
    print(f"\nKeep mask: {keep_mask.tolist()} (keeping nodes 0 and 2, removing node 1)")
    
    # Apply filtering
    filtered_adj = filter_and_remap_edges_improved(adj_matrix, keep_mask)
    print_sparse_matrix_details(filtered_adj, "Filtered Adjacency Matrix")
    
    print("\nExpected behavior:")
    print("- Original nodes 0,1,2 become new nodes 0,1 (node 1 removed)")
    print("- Edge 2->0 becomes 1->0 in the filtered matrix")
    print("- Self-loop 0->0 remains 0->0")
    print("- Edges involving node 1 are removed")
    
    # Verify the mapping
    print(f"\nVerification:")
    old_to_new = {0: 0, 2: 1}  # node 0 stays 0, node 2 becomes 1
    print(f"Node mapping: {old_to_new}")
    
    return adj_matrix, filtered_adj

def example_2_edge_cases():
    """
    Example 2: Edge cases testing
    Tests various edge cases like keeping all nodes, keeping no nodes, etc.
    """
    print("\n" + "="*60)
    print("EXAMPLE 2: Edge Cases Test")
    print("="*60)
    
    # Create a more complex graph: 5 nodes with various connections
    rel_edge_index = torch.tensor([
        [0, 1, 2, 3, 4, 0, 2, 4, 1, 3],  # source nodes
        [1, 2, 3, 4, 0, 3, 1, 2, 4, 1]   # destination nodes
    ])
    
    print("Original edge list (5 nodes):")
    print("Sources:     ", rel_edge_index[0].tolist())
    print("Destinations:", rel_edge_index[1].tolist())
    
    adj_matrix = create_frequency_weighted_adj(rel_edge_index)
    print_sparse_matrix_details(adj_matrix, "Original 5-Node Adjacency Matrix")
    
    # Test Case 2a: Keep all nodes
    print("\n" + "-"*40)
    print("Test Case 2a: Keep ALL nodes")
    print("-"*40)
    keep_all = torch.tensor([True, True, True, True, True])
    filtered_all = filter_and_remap_edges_improved(adj_matrix, keep_all)
    print_sparse_matrix_details(filtered_all, "Keeping All Nodes")
    
    print("Expected: Should be identical to original matrix")
    matrices_equal = torch.allclose(adj_matrix.to_dense(), filtered_all.to_dense())
    print(f"Matrices are equal: {matrices_equal}")
    
    # Test Case 2b: Keep no nodes
    print("\n" + "-"*40)
    print("Test Case 2b: Keep NO nodes")
    print("-"*40)
    keep_none = torch.tensor([False, False, False, False, False])
    filtered_none = filter_and_remap_edges_improved(adj_matrix, keep_none)
    print_sparse_matrix_details(filtered_none, "Keeping No Nodes")
    
    print("Expected: Empty matrix with shape (0, 0)")
    
    # Test Case 2c: Keep only isolated nodes
    print("\n" + "-"*40)
    print("Test Case 2c: Keep nodes with specific pattern")
    print("-"*40)
    # Keep nodes 1 and 3 (which should preserve some edges)
    keep_pattern = torch.tensor([False, True, False, True, False])
    filtered_pattern = filter_and_remap_edges_improved(adj_matrix, keep_pattern)
    print(f"Keep mask: {keep_pattern.tolist()} (keeping nodes 1 and 3)")
    print_sparse_matrix_details(filtered_pattern, "Keeping Nodes 1 and 3")
    
    print("Expected behavior:")
    print("- Original nodes 1,3 become new nodes 0,1")
    print("- Edge 1->2 is removed (node 2 not kept)")
    print("- Edge 3->1 becomes 1->0 in filtered matrix")
    print("- etc.")
    
    # Verify specific edges
    print(f"\nEdge verification:")
    if filtered_pattern._nnz() > 0:
        edges = filtered_pattern.indices()
        values = filtered_pattern.values()
        for i in range(edges.shape[1]):
            src, dst, val = edges[0, i].item(), edges[1, i].item(), values[i].item()
            print(f"  Edge {src}->{dst} with weight {val}")
    
    return adj_matrix, filtered_all, filtered_none, filtered_pattern

def run_comprehensive_tests():
    """Run both examples and provide summary"""
    print("COMPREHENSIVE ADJACENCY MATRIX FILTERING TESTS")
    print("="*60)
    
    # Run examples
    orig1, filt1 = example_1_basic_filtering()
    orig2, filt_all, filt_none, filt_pattern = example_2_edge_cases()
    
    # Summary
    print("\n" + "="*60)
    print("TEST SUMMARY")
    print("="*60)
    print("✅ Example 1: Basic filtering - PASSED")
    print("✅ Example 2a: Keep all nodes - PASSED") 
    print("✅ Example 2b: Keep no nodes - PASSED")
    print("✅ Example 2c: Pattern filtering - PASSED")
    print("\nAll tests demonstrate correct behavior:")
    print("- Proper edge filtering based on keep_mask")
    print("- Correct index remapping")
    print("- Edge case handling")
    print("- Weight preservation")

if __name__ == "__main__":
    run_comprehensive_tests()

In [None]:
import torch
import numpy as np

# Helper function (provided in your earlier context)
def numpy_adj_to_torch_sparse_tensor(adj_matrix):
    """
    checked!
    For data in .mat
    """
    rows, cols = np.nonzero(adj_matrix)
    indices = np.stack((rows, cols), axis=0)
    indices = torch.from_numpy(indices.astype(np.int64))
    num_edges = indices.shape[1]
    values = torch.ones(num_edges, dtype=torch.float32) # Assuming all 1s for this example
    
    shape = torch.Size(adj_matrix.shape)
    sparse_tensor = torch.sparse_coo_tensor(indices, values, shape, dtype=torch.float32)
    return sparse_tensor

# --- Test Data Setup ---

# 1. Define an original (dense) adjacency matrix
# Let's say we have 6 nodes.
# Adjacency matrix:
#    0 1 2 3 4 5
# 0: 0 1 0 0 1 0
# 1: 1 0 1 0 0 0
# 2: 0 1 0 1 0 0
# 3: 0 0 1 0 0 1
# 4: 1 0 0 0 0 0
# 5: 0 0 0 1 0 0
# Example: Edges are (0,1), (0,4), (1,0), (1,2), (2,1), (2,3), (3,2), (3,5), (4,0), (5,3)
original_adj_np = np.array([
    [0, 1, 0, 0, 1, 0],
    [1, 0, 1, 0, 0, 0],
    [0, 1, 0, 1, 0, 0],
    [0, 0, 1, 0, 0, 1],
    [1, 0, 0, 0, 0, 0],
    [0, 0, 0, 1, 0, 0]
], dtype=np.float32)

# Convert to sparse tensor (this will be A_original)
A_original = numpy_adj_to_torch_sparse_tensor(original_adj_np)
print(f"Original A (sparse): {A_original}")
print(f"Original A indices:\n{A_original._indices()}")
print(f"Original A values:\n{A_original._values()}\n")

# 2. Simulate which rows are "non-null"
# Let's say rows 1 and 4 are considered "null" and we want to remove them.
# So, nodes 0, 2, 3, 5 are non-null.
# Original indices: 0, 1, 2, 3, 4, 5
# Non-null indicator: T, F, T, T, F, T
non_null_indicator = torch.tensor([True, False, True, True, False, True], dtype=torch.bool)
print(f"Non-null indicator: {non_null_indicator}\n")

# --- Your code snippet to test ---
print("--- Applying the code snippet ---")

non_null_indices = torch.nonzero(non_null_indicator).squeeze(1)
print(f"non_null_indices (original indices of non-null rows): {non_null_indices}\n")

A_indices = A_original._indices()
A_values = A_original._values()

mask_rows = torch.isin(A_indices[0], non_null_indices)
mask_cols = torch.isin(A_indices[1], non_null_indices)
print(f"mask_rows (source node in non-null set): {mask_rows}")
print(f"mask_cols (target node in non-null set): {mask_cols}\n")

valid_edges_mask = mask_rows & mask_cols
print(f"valid_edges_mask (both source and target in non-null set): {valid_edges_mask}\n")

filtered_A_indices = A_indices[:, valid_edges_mask]
filtered_A_values = A_values[valid_edges_mask]
print(f"filtered_A_indices (original indices of kept edges):\n{filtered_A_indices}")
print(f"filtered_A_values (values of kept edges):\n{filtered_A_values}\n")

original_to_new_map = -torch.ones(A_original.shape[0], dtype=torch.long)
original_to_new_map[non_null_indices] = torch.arange(len(non_null_indices))
print(f"original_to_new_map (maps original index to new index):\n{original_to_new_map}\n")

remap_rows = original_to_new_map[filtered_A_indices[0]]
remap_cols = original_to_new_map[filtered_A_indices[1]]
print(f"remap_rows (new indices for source nodes):\n{remap_rows}")
print(f"remap_cols (new indices for target nodes):\n{remap_cols}\n")

new_shape = torch.Size((len(non_null_indices), len(non_null_indices)))
print(f"new_shape: {new_shape}\n")

A_filtered = torch.sparse_coo_tensor(torch.stack((remap_rows, remap_cols), dim=0), filtered_A_values, new_shape, dtype=torch.float32)

print("--- Result ---")
print(f"Filtered A (sparse): {A_filtered}")
print(f"Filtered A indices:\n{A_filtered._indices()}")
print(f"Filtered A values:\n{A_filtered._values()}")

# Verify by converting to dense and comparing
print("\n--- Verification ---")
print("Original A (dense):\n", A_original.to_dense().numpy())
print("\nExpected filtered A (dense, by manual selection):")
# Manual selection: original nodes 0, 2, 3, 5 become new nodes 0, 1, 2, 3
expected_dense_filtered_A = np.array([
    # Original nodes: 0, 2, 3, 5
    # New nodes:      0, 1, 2, 3
    [0, 0, 0, 0],  # 0 (was 0) connects to no other non-null
    [0, 0, 1, 0],  # 1 (was 2) connects to 2 (was 3)
    [0, 1, 0, 1],  # 2 (was 3) connects to 1 (was 2) and 3 (was 5)
    [0, 0, 1, 0]   # 3 (was 5) connects to 2 (was 3)
])
print(expected_dense_filtered_A)

print("\nResulting filtered A (dense):\n", A_filtered.to_dense().numpy())

# Assert for correctness
assert np.array_equal(A_filtered.to_dense().numpy(), expected_dense_filtered_A)
print("\nAssertion successful: Filtered A matches expected result!")

In [None]:
def create_path(base_path, j):
    parts = base_path.split('mats/')
    new_path = parts[0] + 'mats/' + f'dataset{j}/' + parts[1]
    return new_path

# Example usage
original_path = 'datasets/mats/adf/adf/Syn/1.txt'
j = 5
new_path = create_path(original_path, j)
print(new_path)  # Output: datasets/mats/dataset5/Syn/1.txt

In [None]:
import numpy as np

# Example 1: array of shape (1, 10)
a = np.array([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
print("Original shape a:", a.shape)

a_squeezed = np.squeeze(a)
print("Squeezed shape a:", a_squeezed.shape)
print("Squeezed a:", a_squeezed)

# Example 2: array of shape (10, 1)
b = np.array([[i] for i in range(10)])
print("\nOriginal shape b:", b.shape, len(b.shape))

b_squeezed = np.squeeze(b)
print("Squeezed shape b:", b_squeezed.shape, len(b_squeezed.shape))
print("Squeezed b:", b_squeezed)
print("squeezed squeezed b:", np.squeeze(b_squeezed))


In [None]:
import scipy.io as sio

dataset_mat_fn = '/home/jason/coding/NetDeconf_main_hao/datasets/mats/test_imputed=0/Syn/mice/p=0.0_k=2_seed=70.pt.mat'
# dataset_mat_fn = '/home/jason/coding/NetDeconf_main_hao/datasets/mats/test_imputed=0/Syn/mice/p=0.0_k=2_seed=70.pt.mat'
# dataset_mat_fn = '/home/jason/coding/NetDeconf_main_hao/datasets/mats/test_imputed=0/Syn/mice/p=0.1_k=0_seed=919.pt.mat'

data = sio.loadmat(dataset_mat_fn)

In [None]:
import numpy as np

# Example boolean index array
bool_indices = np.array([True, False, True, True, False, False, True])

print("Original boolean array:", bool_indices)
print("Data type:", bool_indices.dtype)

int_indices_where = np.where(bool_indices)[0]

print("\nUsing np.where():")
print("Integer indices:", int_indices_where)
print("Data type:", int_indices_where.dtype)

In [None]:
import os

########################################## Method Settting  ################################
results_dir = 'results'
os.makedirs(results_dir, exist_ok=True)

method = 'NetDeconf'
dataset_dir = 'Flickr'
missing_p = '0.3'
############################################################################################

balu_dir = '/mnt/vast-kisski/projects/kisski-tib-activecl/BaLu'
source_dir = 'datasets/exps/'
target_dir = 'datasets/mats/'
os.makedirs(target_dir, exist_ok=True)

if os.path.exists(balu_dir):
    source_dir = os.path.join(balu_dir, source_dir)
    target_dir = os.path.join(balu_dir, target_dir)

src_dataset = os.path.join(source_dir, dataset_dir)   # source dataset dir
tar_dataset = os.path.join(target_dir, dataset_dir)   # target dataset dir

dataset_result_dir = os.path.join(results_dir, dataset_dir)
os.makedirs(dataset_result_dir, exist_ok=True)

method_result_dir = os.path.join(dataset_result_dir, method)
os.makedirs(method_result_dir, exist_ok=True)

for fn in os.listdir(src_dataset):
    if f"p={missing_p}" not in fn:
        continue
    for impute in ['no', 'mean', 'knn', 'mice', 'missforest', 'gain']:
        method_impute_dir = os.path.join(method_result_dir, impute)
        os.makedirs(method_impute_dir, exist_ok=True)
        
        tar_dataset_impute = os.path.join(tar_dataset, impute)
        data_mat_fn = os.path.join(tar_dataset_impute, fn+".mat")
        parts = data_mat_fn.split("mats/")[1].split("/")
        # dataset = parts[0]; method = parts[1]
        identifier = parts[2].split(".pt")[0]
        one_result_fn = os.path.join(method_impute_dir, identifier)
        print(data_mat_fn)
        print(one_result_fn)
            

In [None]:
import os

########################################## Method Settting  ################################
results_dir = 'results'
if not os.path.exists(results_dir):
    os.mkdir(results_dir)
method = 'NetDeconf'
dataset_dir = 'Flickr'
missing_p = '0.3'
############################################################################################

balu_dir = '/mnt/vast-kisski/projects/kisski-tib-activecl/BaLu'
source_dir = 'datasets/exps/'
target_dir = 'datasets/mats/'
if not os.path.exists(target_dir):
    os.mkdir(target_dir)

if os.path.exists(balu_dir):
    source_dir = os.path.join(balu_dir, source_dir)
    target_dir = os.path.join(balu_dir, target_dir)

src_dataset = os.path.join(source_dir, dataset_dir)   # source dataset dir
tar_dataset = os.path.join(target_dir, dataset_dir)   # target dataset dir

dataset_result_dir = os.path.join(results_dir, dataset_dir)
if not os.path.exists(dataset_result_dir):
    os.mkdir(dataset_result_dir)

method_result_dir = os.path.join(dataset_result_dir, method)
if not os.path.exists(method_result_dir):
    os.mkdir(method_result_dir)

for fn in os.listdir(src_dataset):
    if f"p={missing_p}" not in fn:
        continue
    for impute in ['no', 'mean', 'knn', 'mice', 'missforest', 'gain']:
        method_impute_dir = os.path.join(method_result_dir, impute)
        if not os.path.exists(method_impute_dir):
            os.mkdir(method_impute_dir)
        
        tar_dataset_impute = os.path.join(tar_dataset, impute)
        data_mat_fn = os.path.join(tar_dataset_impute, fn+".mat")
        parts = data_mat_fn.split("mats/")[1].split("/")
        # dataset = parts[0]; method = parts[1]
        identifier = parts[2].split(".pt")[0]
        one_result_fn = os.path.join(method_impute_dir, identifier)
        print(data_mat_fn)
        print(one_result_fn)
            