In [1]:

import igraph as ig
import time

# Define the graph globally
edges = [(0,5), (1,5), (2,5), (4,5), (4,2), (4,3),(2,3)]
weights = [0.9, 0.99, 0.92, 0.9, 0.91, 0.98, 0.93]
g = ig.Graph.TupleList(edges, directed=False, weights=True)
g.es['weight'] = weights



def ltm(graph, seed_set):
    activated_nodes = set(seed_set)  
    previous_activated_nodes = set() 


    while previous_activated_nodes != activated_nodes:
        previous_activated_nodes = set(activated_nodes)  

        # Iterate over unactivated nodes
        for vertex in graph.vs:
            if vertex.index not in activated_nodes:
                sum_neighbor_weights = 0
                all_neighbors = [neighbor.index for neighbor in vertex.neighbors()]  # Get indices of all neighbors      
                
                total_neighbor_weight = sum(graph.es[graph.get_eid(vertex.index, neighbor_index)]['weight'] for neighbor_index in all_neighbors)
               # print("total_neighbor_weight :",total_neighbor_weight)
                # Calculate sum of normalised edge weights connected to active nodes
                for neighbor_index in all_neighbors:
                    #print("neighbor_index",neighbor_index)
                    #print("activated_nodes :",activated_nodes)
                    if neighbor_index in activated_nodes:
                        eid = graph.get_eid(vertex.index, neighbor_index)
                        edge_weight = graph.es[eid]['weight']
                        normalized_edge_weight = edge_weight / total_neighbor_weight if total_neighbor_weight != 0 else 0
                        #print("normalized_edge_weight ",normalized_edge_weight)
                        sum_neighbor_weights += normalized_edge_weight
                        

                
                # Activate the node if the sum of normalised neighbor weights is >= 0.4
                if sum_neighbor_weights >= 0.4:
                    activated_nodes.add(vertex.index)

    spread = len(activated_nodes) - len(seed_set)  
    return spread,activated_nodes

def greedy_ltm(graph, k):
    seed_set = [] 
    spread = []    
    timelapse = [] 
    activated_nodes_sizes = []  
    start_time = time.time()  



    for i in range(k):

        best_spread = 0
        best_node = None
        
        for j in set(range(graph.vcount())) - set(seed_set):
            current_seed_set = seed_set + [j]
            current_spread, activated_nodes = ltm(graph, current_seed_set)  

            if current_spread is not None and current_spread > best_spread:
                best_spread = current_spread
                best_node = j
            
        if best_node is not None:
            seed_set.append(best_node)
            spread.append(best_spread)
            
            _,activated_nodes = ltm(graph,seed_set)  
            activated_nodes_sizes.append(len(activated_nodes)) 

            elapsed_time = time.time() - start_time
            timelapse.append(elapsed_time)  

        else:
            break


    return seed_set, spread, timelapse, activated_nodes_sizes


def greedy_ltm2(graph, k):
    seed_set = [] 
    spread = []    
    timelapse = [] 
    activated_nodes_sizes = []  
    start_time = time.time()  


    for i in range(int(k/2)):

        best_spread = 0
        best_nodes = None
        
        for j in set(range(graph.vcount())) - set(seed_set):
            for l in set(range(graph.vcount())) - set(seed_set):
                if l != j:  # Ensure the two nodes are distinct
                    current_seed_set = seed_set + [j, l]
                    current_spread, activated_nodes = ltm(graph, current_seed_set)  

                    if current_spread is not None and current_spread > best_spread:
                        best_spread = current_spread
                        best_nodes = (j, l)
                        

        if best_nodes is not None:
            seed_set.extend(best_nodes)
            spread.append(best_spread)
            
            _,activated_nodes = ltm(graph,seed_set) 
            activated_nodes_sizes.append(len(activated_nodes)) 
         
            elapsed_time = time.time() - start_time
            timelapse.append(elapsed_time)  

        else:

            break

    return seed_set, spread, timelapse, activated_nodes_sizes

def greedy_ltm2_dis(graph, k):
    seed_set = [] 
    spread = []    
    timelapse = [] 
    activated_nodes_sizes = []  # To store the number of activated nodes for each seed set size
    start_time = time.time()  

    for i in range(int(k / 2)):
        best_spread = 0
        best_nodes = None
        
        for j in set(range(graph.vcount())) - set(seed_set):
            for l in set(range(graph.vcount())) - set(seed_set):
                # Ensure the two nodes are distinct and satisfy additional conditions
                if l != j and not any(graph.are_connected(j, neighbor) for neighbor in graph.neighbors(l)) \
                        and not any(graph.are_connected(l, neighbor) for neighbor in graph.neighbors(j)) \
                        and not graph.are_connected(j, l) and not graph.are_connected(l, j):
                    current_seed_set = seed_set + [j, l]
                    current_spread, activated_nodes = ltm(graph, current_seed_set)  # Get activated nodes

                    if current_spread is not None and current_spread > best_spread:
                        best_spread = current_spread
                        best_nodes = (j, l)
                    
        if best_nodes is not None:
            seed_set.extend(best_nodes)
            spread.append(best_spread)
            _, activated_nodes = ltm(graph, seed_set) 
            activated_nodes_sizes.append(len(activated_nodes)) 
            elapsed_time = time.time() - start_time
            timelapse.append(elapsed_time) 
    

    return seed_set, spread, timelapse, activated_nodes_sizes




def diffusion_degree(graph, vertex_index, threshold):
    vertex = graph.vs[vertex_index]
    # CDD(v)
    cd_v = graph.degree(vertex)
    # C'DD(v)
    cd_prime_v = threshold * cd_v
    # C''DD(v)
    c_double_prime_dd_v = 0
    
    # For each neighbor of the vertex
    for neighbor_index in graph.neighbors(vertex):
        neighbor = graph.vs[neighbor_index]

        # Calculate normalized edge weight (λ) from v to neighbor
        eid = graph.get_eid(vertex_index, neighbor_index)
        edge_weight = graph.es[eid]['weight']
        
        # Total weight of edges connected to the current neighbor
        total_neighbor_weight = sum(graph.es[graph.get_eid(neighbor.index, neighbor_neighbor.index)]['weight'] for neighbor_neighbor in neighbor.neighbors())
        
        #Normalise edge weight (λ) from v to neighbor
        normalized_edge_weight = edge_weight / total_neighbor_weight if total_neighbor_weight != 0 else 0
        
        # Calculate λ * (degree of neighbor - 1)
        lambda_times_degree_minus_1 = normalized_edge_weight * (graph.degree(neighbor) - 1)
        c_double_prime_dd_v += lambda_times_degree_minus_1

    # CDD(v) = C'DD(v)+ C''DD(v)
    cdd_v = cd_prime_v + c_double_prime_dd_v

    return cdd_v


def top_cdd(graph, threshold, k):
    diffusion_degrees = {}
    # For each vertex index in the graph
    for vertex_index in range(len(graph.vs)):
        diffusion_degrees[vertex_index] = diffusion_degree(graph, vertex_index, threshold)
  
    sorted_vertices = sorted(diffusion_degrees.items(), key=lambda x: x[1], reverse=True)
    
    top_cdd = [vertex_index for vertex_index, _ in sorted_vertices[:k]]
    
    return top_cdd



def greedy_ltm_with_top_cddratio(graph, k, ratio):
    seed_set = []
    spread = []    
    timelapse = [] 
    activated_nodes_sizes = []  
    start_time = time.time()  

    threshold = 0.4
    top_cdd_nodes = top_cdd(graph, threshold, int(k * ratio))  # Get top nodes from top_cdd
    elapsed_time = time.time() - start_time

    # Add nodes from top_cdd to seed_set
    seed_set.extend(top_cdd_nodes)

    rest_nodes_count = k - len(top_cdd_nodes)  # Calculate the number of remaining nodes needed

    while rest_nodes_count > 0: 
        best_spread = 0
        best_unactivated_node = None
        
        # Find the best pair such that one is in top_cdd and the other is an unactivated node
        for i in top_cdd_nodes:
            for j in range(graph.vcount()):  
                if j not in top_cdd_nodes and j not in seed_set:  
                    current_seed_set = seed_set + [j]  # Add the unactivated node to the seed set
                    current_spread, _ = ltm(graph, current_seed_set)  

                    if current_spread is not None and current_spread > best_spread:
                        best_spread = current_spread
                        best_unactivated_node = j
        
        if best_unactivated_node is not None:
            seed_set.append(best_unactivated_node)
            spread.append(best_spread)
            elapsed_time = time.time() - start_time
            timelapse.append(elapsed_time)
            rest_nodes_count -= 1  

        else:

            break
    
  
    return seed_set, spread, timelapse, activated_nodes_sizes




In [2]:
def test_ltm():
    global g  
    seed_sets = [[0], [1], [2], [3], [4],[5]]  
    expected_spreads = [0,2,0,5,5,0]
    for i, seed_set in enumerate(seed_sets):
        spread, _ = ltm(g, seed_set)
        print(f"Seed Set: {seed_set}, Spread: {spread}, Expected Spread: {expected_spreads[i]}")
        assert spread == expected_spreads[i]

def test_greedy_ltm():
    global g  
    k = 5
    expected_seed_sets = [[3], [3, 0], [3, 0, 1], [3, 0, 1, 2], [3, 0, 1, 2, 4]]
    for i in range(k):
        seed_set, spread, timelapse, activated_nodes_sizes = greedy_ltm(g, i + 1)
        print(f"k={i + 1}: Seed Set: {seed_set}, Expected Seed set: {expected_seed_sets[i]}")
        assert seed_set == expected_seed_sets[i]
        
def test_greedy_ltm2():
    global g  
    k = 4
    expected_seed_sets = [[0,3], [0, 3, 1, 2]]
    for i in range(k // 2):
        seed_set, spread, timelapse, activated_nodes_sizes = greedy_ltm2(g, (i + 1) * 2)
        print(f"k={(i + 1) * 2}: Seed Set: {seed_set}, Expected Seed set: {expected_seed_sets[i]}")
        assert seed_set == expected_seed_sets[i]
        
def test_greedy_ltm2_dis():
    edges = [(0,5), (1,5), (2,5), (4,2), (4,3)]
    weights = [0.9, 0.99, 0.9, 0.97, 0.92]
    g = ig.Graph.TupleList(edges, directed=False, weights=True)
    g.es['weight'] = weights

    k = 4
    expected_seed_sets = [[0, 4], [0, 4, 1, 5]]
    for i in range(k // 2):
        seed_set, spread, timelapse, activated_nodes_sizes = greedy_ltm2_dis(g, (i + 1) * 2)
        print(f"k={(i + 1) * 2}: Seed Set: {seed_set}, Expected Seed set: {expected_seed_sets[i]}")
        assert seed_set == expected_seed_sets[i]
    
def test_diffusion_degree():
    global g
    threshold = 0.4
    expected_dd = [1.128, 2.912, 1.201, 3.083, 3.1, 2.176]
    for vertex_index in range(len(g.vs)):
        dd_result = diffusion_degree(g, vertex_index, threshold)
        dd_result = round(diffusion_degree_result, 3)
        print(f"Vertex {vertex_index}: Diffusion Degree: {dd_result}, Expected: {expected_dd[vertex_index]}")
        assert diffusion_degree_result == expected_diffusion_degrees[vertex_index]
        
def test_top_cdd():
    global g
    threshold = 0.4
    k = 6
    expected_top_cdd = [4, 3, 1, 5, 2, 0]
    top_cdd_result = top_cdd(g, threshold, k)
    print(f"k= {k}, Top_cdd: {top_cdd_result}, Expected_top_cdd: {expected_top_cdd}")
    assert top_cdd_result == expected_top_cdd

        
def test_greedy_ltm_with_top_cddratio():
    global g 
    ratio= 0.5
    k=4
    expected_seed_sets = [[4, 0], [4, 3, 0, 1]]
    for i in range(k // 2):
        seed_set, spread, timelapse, activated_nodes_sizes = greedy_ltm_with_top_cddratio(g, (i + 1) * 2,ratio)
        print(f"k={(i + 1) * 2}: Seed Set: {seed_set}, Expected Seed set: {expected_seed_sets[i]}")
        assert seed_set == expected_seed_sets[i]

    
    

if __name__ == '__main__':
    print("Test: ltm")
    test_ltm()
    print("\nTest: Simple Greedy ")
    test_greedy_ltm()
    print("\nTest: Greedy Pairwise ")
    test_greedy_ltm2()
    print("\nTest: Greedy Pairwise with distance")
    test_greedy_ltm2_dis()
    print("\nTest: Diffusion degree")
    test_diffusion_degree()
    print("\nTest: Top-k CDD")
    test_top_cdd()
    print("\nTest: Hybrid Centrality measure with Greedy")
    test_greedy_ltm_with_top_cddratio()
    

Test: ltm
Seed Set: [0], Spread: 0, Expected Spread: 0
Seed Set: [1], Spread: 2, Expected Spread: 2
Seed Set: [2], Spread: 0, Expected Spread: 0
Seed Set: [3], Spread: 5, Expected Spread: 5
Seed Set: [4], Spread: 5, Expected Spread: 5
Seed Set: [5], Spread: 0, Expected Spread: 0

Test: Simple Greedy 
k=1: Seed Set: [3], Expected Seed set: [3]
k=2: Seed Set: [3, 0], Expected Seed set: [3, 0]
k=3: Seed Set: [3, 0, 1], Expected Seed set: [3, 0, 1]
k=4: Seed Set: [3, 0, 1, 2], Expected Seed set: [3, 0, 1, 2]
k=5: Seed Set: [3, 0, 1, 2, 4], Expected Seed set: [3, 0, 1, 2, 4]

Test: Greedy Pairwise 
k=2: Seed Set: [0, 3], Expected Seed set: [0, 3]
k=4: Seed Set: [0, 3, 1, 2], Expected Seed set: [0, 3, 1, 2]

Test: Greedy Pairwise with distance
k=2: Seed Set: [0, 4], Expected Seed set: [0, 4]
k=4: Seed Set: [0, 4, 1, 5], Expected Seed set: [0, 4, 1, 5]

Test: Diffusion degree
Vertex 0: Diffusion Degree: 1.128, Expected: 1.128
Vertex 1: Diffusion Degree: 2.912, Expected: 2.912
Vertex 2: Diffus

<module 'pytest' from 'C:\\Users\\laure\\anaconda3\\lib\\site-packages\\pytest\\__init__.py'>