# Network Analysis

In [33]:
from typing import NamedTuple

class User(NamedTuple):
    id: int
    name: str

In [34]:
users = [User(0, "Hero"), User(1, "Dunn"), User(2, "Sue"),
         User(3, "Chi"), User(4, "Thor"), User(5, "Clive"),
         User(6, "Hicks"), User(7, "Devin"), User(8, "Kate"),
         User(9, "Klein")]

In [61]:
friends_pairs = [(0, 1), (0, 2), (1, 2), (1, 3), (2, 3),
                 (3, 4), (4, 5), (5, 6), (5, 7), (6, 8),
                 (7, 8), (8, 9)]

In [62]:
from typing import Dict, List

In [63]:
Friendships = Dict[int, List[int]]

friendships: Friendships = {user.id: [] for user in users}

In [64]:
for i, j in friends_pairs:
    friendships[i].append(j)
    friendships[j].append(i)

friendships

{0: [1, 2],
 1: [0, 2, 3],
 2: [0, 1, 3],
 3: [1, 2, 4],
 4: [3, 5],
 5: [4, 6, 7],
 6: [5, 8],
 7: [5, 8],
 8: [6, 7, 9],
 9: [8]}

In [65]:
from collections import deque

In [66]:
path = List[int]

def shortest_paths_from(from_user_id: int, friendships: Friendships)-> Dict[int, List[path]]:
    shortest_paths_to: Dict[int, List[path]] = {from_user_id:[[]]}
    frontier = deque((from_user_id, friend_id)
                     for friend_id in friendships[from_user_id])
    
    while frontier:
        prev_user_id, user_id = frontier.popleft()
        paths_to_prev_user = shortest_paths_to[prev_user_id]
        new_path_to_user = [path + [user_id] for path in paths_to_prev_user]

        old_paths_to_user = shortest_paths_to.get(user_id, [])
        old_paths_to_user = shortest_paths_to.get(user_id, [])

        if old_paths_to_user:
            min_path_length = len(old_paths_to_user[0])
        else:
            min_path_length = float('inf')
        

        new_path_to_user = [path
                            for path in new_path_to_user
                            if len(path) <= min_path_length and path not in old_paths_to_user]
        
        shortest_paths_to[user_id] = old_paths_to_user + new_path_to_user

        frontier.extend((user_id, friend_id) for friend_id in friendships[user_id]
        if friend_id not in shortest_paths_to)
        return shortest_paths_to

In [67]:
shortest_paths = {user.id: shortest_paths_from(user.id, friendships)
                    for user in users}

shortest_paths

{0: {0: [[]], 1: [[1]]},
 1: {1: [[]], 0: [[0]]},
 2: {2: [[]], 0: [[0]]},
 3: {3: [[]], 1: [[1]]},
 4: {4: [[]], 3: [[3]]},
 5: {5: [[]], 4: [[4]]},
 6: {6: [[]], 5: [[5]]},
 7: {7: [[]], 5: [[5]]},
 8: {8: [[]], 6: [[6]]},
 9: {9: [[]], 8: [[8]]}}

In [68]:
betweenness_centrality = {user.id: 0.0 for user in users}

In [69]:
for source in users:
    for target_id, paths  in shortest_paths[source.id].items():
        if source.id < target_id:
            num_paths = len(paths)
            contrib = 1 / num_paths
            for path in paths:
                for between_id in path:
                    if between_id not in [source.id, target_id]:
                        betweenness_centrality[between_id] += contrib

In [79]:
def farness(user_id: int) -> float:
    '''the sum of the lengths of the shortest paths to each other user'''
    return sum(len(path[0]) for paths in shortest_paths[user_id].values())

In [None]:
closeness_centrality = {user.id: 1 / farness(user.id) for user in users}

## Matrics Mulitiplication

In [81]:
from linearalgebra import Matrix, make_matrix, shape

In [82]:
def matrix_times_matrix(m1: Matrix, m2: Matrix) -> Matrix:
    nr1, nc1 = shape(m1)
    nr2, nc2 = shape(m2)
    assert nc1 == nr2, " "
    
    def entry_fn(i: int, j: int) -> float:
        return sum(m1[i][k] * m2[k][j] for k in range(nc1))
    
    return make_matrix(nr1, nc2, entry_fn)

In [83]:
from linearalgebra import Vector, dot

In [84]:
def matrix_times_vector(m: Matrix, v: Vector) -> Vector:
    nr, nc = shape(m)
    n = len(v)

    assert nc == n, " "

    return [dot(row, v) for row in m]

In [85]:
from typing import Tuple
import random
from linearalgebra import magnitude, distance

In [86]:
def find_eigenvector(m: Matrix, tolerance: float = .00001) -> Tuple[Vector, float]:

    guess =  [random.random() for _ in m]

    while True:
        result = matrix_times_vector(m, guess)

        norm = magnitude(result)

        next_guess = [x / norm for x in result]

        if distance(guess, next_guess) < tolerance:
            return next_guess, norm
        guess = next_guess

## Centrality

In [90]:
def entry_fn(i: int, j: int):
    return 1 if (i, j) in friends_pairs or (j, i) in friends_pairs else 0


In [91]:
n = len(users)

In [92]:
adjacency_matrix = make_matrix(n, n, entry_fn)

In [93]:
eigen_vector_centralities, _ = find_eigenvector(adjacency_matrix)

## Directed Graphs and page rank

In [97]:
endorsements = [(0, 1), (1, 0), (0, 2), (2, 0), (1, 2), (2, 1), (1, 3),(2, 3), (3, 4), (5, 4), (5, 6), (7, 5), (6, 8), (8, 7), (8, 9)]

In [98]:
from collections import Counter

In [99]:
endorsement_counts = Counter(target for source, target in endorsements)

In [103]:
import tqdm

def page_rank(users: List[User], 
              endorsements: List[Tuple[int, int]],
              damping: float = 0.85,
              num_iter: int = 100) -> Dict[int, float]:

              outgoing_counts = Counter(target for source, target in endorsements)

              num_users = len(users)

              pr = {user.id : 1/num_users for user in users}

              base_pr = (1-damping) / num_users

              for iter in tqdm.trange(num_iter):
                  next_pr = {user.id: base_pr for user in users}
                  for source, target in endorsements:
                      next_pr[target] += damping * pr[source]/outgoing_counts[source]
                    
                  pr = next_pr

                
              return pr

In [104]:
pr = page_rank(users, endorsements)

100%|██████████| 100/100 [00:00<00:00, 20055.01it/s]


In [105]:
pr[4]

0.14250000000000002