In [1]:
!python --version

import os 
import timeit 
import numpy as np

Python 3.10.4


In [2]:
from src.utils import load_data, convert_to_numba

In [3]:
users, layers, node_layer_dict = load_data(["neighbor", "colleague"])

In order to use numba, we need to store the data in numba-compatible objects

In [4]:
users_numba, layers_numba, node_layer_dict_numba = convert_to_numba(users, layers, node_layer_dict)

**Note**: This creates substantial overhead. Is it worth it when creating many, many walks? How does it scale with the size of the graph/the number of layers?

Interestingly, numba objects are much smaller than python objects

In [5]:
import sys 
print(sys.getsizeof(layers))
print(sys.getsizeof(layers_numba))

print(sys.getsizeof(node_layer_dict))
print(sys.getsizeof(node_layer_dict_numba))

print(sys.getsizeof(users))
print(sys.getsizeof(users_numba))

88
48
41943136
48
8000056
48


### Compare running time of single walk

In [6]:
from src.walks_numba import single_walk as single_walk_numba
from src.walks import single_walk as single_walk_python
walk_len = 5


In [None]:
%timeit single_walk_python(10, walk_len, node_layer_dict, layers)

In [8]:
# compile
_ = single_walk_numba(10, walk_len, node_layer_dict_numba, layers_numba)

  layer_indices = node_layer_dict[current_node]


In [9]:
%timeit single_walk_numba(10, 5, node_layer_dict_numba, layers_numba)

6.71 μs ± 397 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


### Compare running time for a set of nodes

In [10]:
from src.walks_numba import create_walks as create_walks_numba
from src.walks import create_walks as create_walks_python

sample_size = 200_000


In [11]:
%timeit create_walks_python(users[:sample_size], walk_len, node_layer_dict, layers)

17.7 s ± 1.53 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
# compile 
_ = create_walks_numba(users_numba[:sample_size], walk_len, node_layer_dict_numba, layers_numba)

In [13]:
%timeit create_walks_numba(users_numba[:sample_size], walk_len, node_layer_dict_numba, layers_numba)

956 ms ± 3.93 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Parallelize the numba-compiled code

In [14]:
def walks_wrapper(users):
    return create_walks_numba(users, 5, node_layer_dict_numba, layers_numba, 0.8)


In [15]:
_ = walks_wrapper(users[:10])

In [16]:
import asyncio

In [17]:
from src.utils import batched

In [18]:
# https://esciencecenter-digital-skills.github.io/parallel-python-workbench/extra-asyncio.html
from dataclasses import dataclass
from typing import Optional
from time import perf_counter
from contextlib import asynccontextmanager


@dataclass
class Elapsed:
    time: Optional[float] = None


@asynccontextmanager
async def timer():
    e = Elapsed()
    t = perf_counter()
    yield e
    e.time = perf_counter() - t

In [19]:
async def create_walks_parallel(users, n, n_workers):
    result = await asyncio.gather(*(asyncio.to_thread(walks_wrapper, batch) for batch in batched(users[:n], n//n_workers)))
    return result 


In [49]:
async with timer() as t:  # does not seem to parallelize; speed is very volatile
    result = await create_walks_parallel(users, sample_size, 8)

print(f"that took {t.time} seconds")


that took 0.4622826839913614 seconds


In [21]:
n_walks = sum(len(x) for x in result)
final_length = len(result[0][0])
assert final_length >= walk_len # we also store the identifiers of the layers, which adds elements to the walk

print(f"we created {n_walks} walks of length {final_length}")

we created 200000 walks of length 11


In sum: to process the first 200k nodes, we take 
- 17s with pure python
- 1s with sequential numba
- 500ms with parallel numba. 
    - this means almost 34x speedup compared to pure python
    - but it is volatile: sometimes it takes 1.7s, thus longer than sequential numba!

### Notes / next steps
- there is a sweet spot in the optimal batch size for parallelization
- the parallel processing should be able to pass in batches in sequence (ie, there may be more batches than processes, and we cannot process all batches at the same time)
- try parallel processing also with `concurrent.features` (see below)
- try also with concurrent.futures? multithreading module?
- extend functionality of the walks function: do not store the layer types; run only on one layer (-> more classical deepwalk)
- also check which other libraries are out there: is it worth making this reusable?
    - networkx
    - deepwalk implementations?

### Try with concurrent.futures

this does not look like it's parallelized..

In [22]:
import concurrent.futures


In [23]:
def create_walks_parallel_pool(n, n_workers):
    with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as executor:
        future = (executor.submit(
                walks_wrapper, batch
            ) for batch in batched(users[:n], n//n_workers)
        )
        # results = executor.map(wrapped_walks_numba, users[:n], chunksize=1)
        res = [f.result() for f in future]
        return res

In [24]:
res = create_walks_parallel_pool(sample_size, 30)

In [25]:
n_walks = sum(len(x) for x in res)
final_length = len(result[0][0])
assert final_length >= walk_len # we also store the identifiers of the layers, which adds elements to the walk

print(f"we created {n_walks} walks of length {final_length}")

we created 200000 walks of length 11


In [50]:
%timeit create_walks_parallel_pool(sample_size, 2)

1.14 s ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
