In [1]:
import random

In [2]:
def load_edges(path):
    edges = []
    with open(path, "r") as f:
        for line in f:
            edge = line.strip().split()
            if edge:
                edges.append(frozenset(edge))
    return edges

edges = load_edges("soc-Epinions1.txt")
print(f"Loaded {len(edges)} edges.")
edges[:5]

Loaded 508837 edges.


[frozenset({'0', '4'}),
 frozenset({'0', '5'}),
 frozenset({'0', '7'}),
 frozenset({'0', '8'}),
 frozenset({'0', '9'})]

In [3]:
def reservoir_sampling_base(edge, k, reservoir, i):
    if i < k:
        [reservoir.append(edge), True, i]
    else:
        p = random.randint(0, i)
        if p < k:
            replace = random.randint(0, k - 1)
            reservoir[replace] = edge
            return [reservoir, True, replace]
    return [reservoir, False, -1]

k = 1000
reservoir = []

for (i, edge) in enumerate(edges):
    [reservoir, _, _] = reservoir_sampling_base(edge, k, reservoir, i)
print(reservoir)
print(len(reservoir))

[frozenset({'3092', '29727'}), frozenset({'555', '12294'}), frozenset({'64614', '35360'}), frozenset({'2189', '9974'}), frozenset({'10721', '947'}), frozenset({'54848', '16935'}), frozenset({'41490', '14524'}), frozenset({'2372', '3065'}), frozenset({'1439', '1869'}), frozenset({'75800', '75804'}), frozenset({'18246', '1274'}), frozenset({'5195', '37691'}), frozenset({'684', '26822'}), frozenset({'1779', '4162'}), frozenset({'2008', '3310'}), frozenset({'3266', '2557'}), frozenset({'18050', '30921'}), frozenset({'4049', '2648'}), frozenset({'35070', '64503'}), frozenset({'59', '1663'}), frozenset({'1029', '1319'}), frozenset({'67405', '67403'}), frozenset({'57291', '69375'}), frozenset({'8523', '14437'}), frozenset({'13463', '693'}), frozenset({'766', '6320'}), frozenset({'320', '329'}), frozenset({'20002', '53818'}), frozenset({'8071', '634'}), frozenset({'30005', '6815'}), frozenset({'3522', '16377'}), frozenset({'33730', '5309'}), frozenset({'4751', '36357'}), frozenset({'2787', '70

In [4]:
def generate_wedges(edge, edge_reservoir):
    totalWedges = 0
    wedgesInvolvingEdge = []
    for i in range(len(edge_reservoir)):
        for j in range(i + 1, len(edge_reservoir)):
            if len(edge_reservoir[i].intersection(edge_reservoir[j])) == 1:
                totalWedges += 1
                if (edge_reservoir[i] == edge or edge_reservoir[j] == edge):
                    wedgesInvolvingEdge.append((edge_reservoir[i], edge_reservoir[j]))
    return [totalWedges, wedgesInvolvingEdge]


print(generate_wedges(reservoir[11], reservoir))


[393, []]


In [5]:
def wedgeClosed(wedge, edge):
    neededEdge = wedge[0].union(wedge[1]) - wedge[0].intersection(wedge[1])
    return neededEdge == edge

wedgeClosed((frozenset(['1', '2']), frozenset(['2', '3'])), frozenset(['1', '3']))

True

In [9]:
def streaming_triangles(re_size, rw_sisze, stream):
    edge_reservoir = []
    wedge_reservoir = []
    is_closed = []
    totalWedges = 0
    totalTriangles = 0

    for (i, edge) in enumerate(stream):
        print(f"Processing edge {i+1}/{len(stream)}", end='\r')
        update(edge, re_size, rw_sisze, edge_reservoir, wedge_reservoir, is_closed, totalWedges)
        p = sum(is_closed) / len(is_closed) if is_closed else 0
        totalTriangles = ((p*i*i)/(re_size*(re_size-1))) * totalWedges
    return totalTriangles

def update(edge, re_size, rw_size, edge_reservoir, wedge_reservoir, is_closed, total_wedges):
    for (i, wedge) in enumerate(wedge_reservoir):
        if wedgeClosed(wedge, edge):
            is_closed[i] = 1
    [edge_reservoir, edgeAdded, _] = reservoir_sampling_base(edge, re_size, edge_reservoir, len(edge_reservoir))
    if edgeAdded:
        [newWedgesCount, newWedges] = generate_wedges(edge, edge_reservoir)
        numNewWedges = len(newWedges)
        for wedge in newWedges:
            [wedge_reservoir, wedgeAdded, pos] = reservoir_sampling_base(wedge, rw_size, wedge_reservoir, len(wedge_reservoir))
            if wedgeAdded:
                if (len(is_closed) > pos):
                    is_closed[pos] = 0
                else:
                    is_closed.append(0)
        return(edge_reservoir, wedge_reservoir, is_closed, numNewWedges)
    return(edge_reservoir, wedge_reservoir, is_closed, total_wedges)

streaming_triangles(1000, 1000, edges)


Processing edge 1312/508837

KeyboardInterrupt: 