## **Assingment 3** - Group 50

Lütfi Altin (lutfia@kth.se) |
Jakob Heyder (heyder@kth.se)

### Task:

1. M. Jha, C. Seshadhri, and A. Pinar, A Space-Efficient Streaming Algorithm for Estimating Transitivity and Triangle Counts Using the Birthday Paradox, ACM TKDD, 9-3, 2015.
2. L. De Stefani, A. Epasto, M. Riondato, and E. Upfal, TRIÈST: Counting Local and Global Triangles in Fully-Dynamic Streams with Fixed Memory Size, KDD'16.
3. P.  Boldi and S. Vigna, In-Core Computation of Geometric Centralities with HyperBall: A Hundred Billion Nodes and Beyond, ICDMW'13.


You are to study and implement a streaming graph processing algorithm described in one of the above papers of your choice. In order to accomplished your task, you are to perform the following two steps

First, implement the reservoir sampling or the Flajolet-Martin algorithm used in the graph algorithm presented in the paper you have selected;
Second, implement the streaming graph algorithm presented in the paper that make use of the algorithm implemented in the first step. 

To ensure that your implementation is correct, you are to test your implementation with some of the publicly available graph datasets and present your test results in a report.

Implementation can be done using any data processing framework that includes support for stream (streaming graph) processing such as Apache Spark, Apache Flink, or no framework, e.g., in Java, Python or other language of your choice. 

### Dataset and Tools

The classes will be implemented as Python functions.

We choose the second paper "TRIÈST: Counting Local and Global Triangles in Fully-Dynamic Streams with Fixed Memory Size" and implemented the Dynamic version of the TRIEST algorithm family - TRIEST-FD.

In [0]:
# Load dependencies (pandas, csv etc.)
import random


Definitions (Mapping the paper algorithm variables to our naming):
* M = memory_size (Maximal size of the set that we track)
* S = reservoir (set of edges we track)
* s = edgeCount (total number of edges at time t)

Similar to paper definitions:
* d_i
* d_o
* t (timestep)


We also use a tuple (boolean, edge) referred to as edgeOp (edge operation) as proposed in the paper. The boolean indicates weather it is an add operation or a deletion. Edge is itself a froozenset with the IDs of the two vertices it is connecting e.g. (u,v).

In [0]:

def triest_fd(data, memory_size):
    reservoir = set()
    t = 0  # timestep
    edgeCount = 0  # total number of edges
    d_i = d_o = 0
    tri = 0  # global triangle count
    tri_l = {}  # local triangle counts

    def inc(node, amount):
        if node not in tri_l:
            tri_l[node] = amount
        else:
            tri_l[node] += amount

            if (tri_l[node] == 0):
                del tri_l[node]  # Keep memory usage constant (as in the paper)

    def flip_biased_coin(p):
        return random.random() < p

    def getVertex(edge, nonEqual):
        e = list(edge)
        return e[0] if e[0] != nonEqual else e[1]

    def update_counters(edgeOp):
        nonlocal tri, tri_l

        # increment if edgeOp[0] , else decrement
        i = 1 if edgeOp[0] else -1

        # Get vertices from frozensets (no indicies)
        edge = list(edgeOp[1])
        if (len(edge) < 2):
            print("Invalid EdgeOp", edgeOp)
        u = edge[0]
        v = edge[1]

        # 1.) calculate shared neighbourhood from u,v nodes of edge
        N_u = {getVertex(n, u) for n in reservoir if u in n}
        N_v = {getVertex(n, v) for n in reservoir if v in n}
        N_u_v = N_u.intersection(N_v)

        # 2.) Inc/Dec the global counters for all c in the shared neighbourhood (building a triangle)
        for n in N_u_v:
            tri += i
            inc(n, i)
            inc(u, i)
            inc(v, i)

    def reservoir_sampling(edge):
        nonlocal reservoir
        # Case 1: Set is not full (memory size) yet, simply add
        if len(reservoir) < memory_size:
            reservoir.add(edge)
            return True

        # Case 2: With probability M/t replace an existing edge, otherwise discard (do nothing)
        elif flip_biased_coin(memory_size / t):
            # choose an edge at random uniformly
            chosen_edge = random.choice(list(reservoir))
            update_counters((False, chosen_edge))
            reservoir.remove(chosen_edge)
            reservoir.add(edge)
            return True

        return False

    def sample_edge(edge):
        nonlocal reservoir, d_o, d_i
        if d_o + d_i == 0:
            return reservoir_sampling(edge)
        elif flip_biased_coin(d_i / (d_i + d_o)):
            reservoir.add(edge[1])
            d_i -= 1
            return True
        else:
            d_o -= 1
            return False

    def run(edgeOp):
        nonlocal t, edgeCount, reservoir, d_o, d_i
        t += 1
        edgeCount += 1 if edgeOp[0] else -1
        if edgeOp[0]:
            if sample_edge(edgeOp[1]):
                update_counters(edgeOp)
        elif edgeOp[1] in reservoir:
            update_counters(edgeOp)
            reservoir.remove(edgeOp[1])
            d_i += 1
        else:
            d_o += 1

    # Main:
    # execute algorithm on data
    with open('triest-output', 'w') as output:
        for edgeOp in data:
            run(edgeOp)
            if t % 10000 == 0:
                print("Global count ", tri, " after ", t, " time steps")
                print("%s,%s" % (t, tri), file=output)
            #print(reservoir)

    return {
        "global-count": tri,
        "local-count": tri_l,
        "total-edge-count": edgeCount,
        "time steps": t,
        "reservoir": reservoir,
        "d_i": d_i,
        "d_o": d_o
    }

example_data = [(True, frozenset(("1", "2"))),
                (True, frozenset(("1", "2"))),
                (True, frozenset(("2", "3"))),
                (True, frozenset(("3", "1"))),
                (True, frozenset(("1", "4"))),
                (True, frozenset(("4", "2")))]

example_data_2 = [(False, frozenset(("1", "2"))),
                  (True, frozenset(("2", "3"))),
                  (True, frozenset(("3", "1"))),
                  (True, frozenset(("1", "2")))]

# triest_fd(example_data, 6)
#print(triest_fd(example_data, 5))


edgeOps = []
i = 0
for l in open('out.lastfm_song', 'r'):
    i += 1
    # Ignore headline
    if i == 1:
        continue
    if (i % 100000 == 0):
        print("%s lines read" % i)
    # Parse each edge operation
    t = l.strip().split(' ')
    # Omit self-loops (not needed for triangles)
    if t[0] != t[1]:
        edgeOpT = (True, frozenset((t[0], t[1]))) # , t[3]) -> do not sort (no timestep required)
        edgeOps.append(edgeOpT)


print("Reading is complete. Number of edges: ", len(edgeOps))
# Sort by timesteps
# edgeOps.sort(key = sortForth)

print("Final result", triest_fd(edgeOps, 200000))


Update counters (True, frozenset({'2', '1'})) {'1'} {'2'}
Update counters (True, frozenset({'2', '3'})) {'3', '1'} {'2'}
Update counters (False, frozenset({'2', '3'})) {'3', '1'} {'2'}
Update counters (True, frozenset({'3', '1'})) {'1'} {'2', '3'}
Update counters (False, frozenset({'3', '1'})) {'1'} {'2', '3'}
Update counters (True, frozenset({'4', '1'})) {'1'} {'4', '2'}


{'d_i': 0,
 'd_o': 0,
 'global-count': 0,
 'local-count': {},
 'reservoir': {frozenset({'1', '4'}), frozenset({'1', '2'})},
 'time steps': 4,
 'total-edge-count': 4}

TODO: 

* Why in the test run with mem-size only 2 it gives zero triangle count? It should work and estimate the 1 global/local triangle. More specifically: Why do they call `update_counters((False, chosen_edge))` when replacing an edge in the reservoir? This does not seem to make much sense, as we should still account for the edge even though we replace it? Otherwise we only keep track of the state of the reservoir?

In [0]:
edgeOps = []
for l in open('out.lastfm_song', 'r'):
    t = l.strip().split(' ')
    edgeOpT = (t[2] > 0, frozenset((t[0], t[1]))) # , t[3]) -> do not sort (no timestep required)
    edgeOps.append(items)

# Sort by timesteps
# edgeOps.sort(key = sortForth)  

triest_fd(edgeOps, 50000)