# Homework 3: Mining Data Streams

This homework has been done by Group 9, the students are Anna Kovács and Alex Orlandi.

The task is to study and implement a streaming graph processing algorithm described in one of the papers presented in Canvas. We decided to analyze deeper the paper 3, P.  Boldi and S. Vigna, "*In-Core Computation of Geometric Centralities with HyperBall: A Hundred Billion Nodes and Beyond*", ICDMW'13, available here: https://arxiv.org/pdf/1308.2144v2.pdf

We have also explored the paper "*HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm*", Philippe Flajolet, Éric Fusy, Olivier Gandouet, Frédéric Meunier, available here: https://dmtcs.episciences.org/3545/pdf

First, we implement the Flajolet-Martin algorithm used in the graph algorithm presented in the paper we have selected (HyperLogLog), then we implement the streaming graph algorithm presented in the paper that uses the algorithm implemented in the first step (HyperBall).

## **Theoretical Background:**

**Flajolet-Martin algorithm:**

- Probabilistic algorithm for estimating the number of distinct elements in streaming dataset -Hashing the elements and analyzing the patterns of trailing zeroes in the binary representation of the hash values


**HyperBall algorithm:**

- Graph algorithm for estimating node-centric reachability in a graph
- Parameters:
  - Neighborhood size: number of distinct nodes reachable from a given node within a certain number of iterations
  - Closeness centrality: measure of how close a node is to others in the graph
- Algorithm:
  - HyperLogLog sketches which is derived from the Flajolet-Martin algorithm, to represent the neighborhood of nodes compactly
  - In every iteration: propagates neighborhood information across edges -> updating each node's sketch to include the nodes reachable in one more hop (=iteration)
  - Repeated until a fix number of iterations (or until convergence)


**Connection between Flajolet-Martin and HyperBall:**

- HyperBall relies on HyperLogLog, which is built on the Flajolet-Martin algorithm to estimate the size of a node's reachable set
- Each sketch in HyperBall: distinct set of nodes reachable from a given node, encoded using probabilistic counting


**Unique node:**
- Distinct vertex in the graph that is counted only once when calculating reachability or influence from a given node

## Initialization

In this section we set up the environment for performing distributed data processing with PySpark.

In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from pyspark import SparkConf, SparkContext # Spark's core classes
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

from google.colab import drive
drive.mount('/content/drive')

import findspark  # To set up the environment for pyspark
import hashlib # To hash data
import math
import sys

from functools import reduce
from builtins import bin, max, sum

Mounted at /content/drive


In [None]:
findspark.init() # findspark's initialization

conf = SparkConf().setAppName("Homework_3_Data_streams").setMaster("local[*]") # Local mode
sc = SparkContext(conf=conf) # Entry point for Spark's functionalities

## HyperLogLog Implementation

In [None]:
def read_graph(file_path, sc):
    """
    Reads the graph from a file and returns an RDD of edges.

    Args:
        file_path (str): Path to the edge list file.
        sc (SparkContext): SparkContext.

    Returns:
        RDD: RDD of (src, dst) tuples.
    """
    lines = sc.textFile(file_path)
    edges = lines.map(lambda line: tuple(line.strip().split())).map(lambda x: (x[0], x[1]))
    return edges

def create_adjacency_list(edges):
    """
    Creates an adjacency list from the edges, where each node points to its neighbors.
    Relation to the paper: the paper assumes that the graph is scanned sequentially.
    This adjacency list allows efficient neighborhood propagation during HyperBall
    iterations.

    Args:
        edges (RDD): RDD of (src, dst) tuples.

    Returns:
        RDD: RDD of (node, [neighbors]) tuples.
    """
    adjacency = edges.flatMap(lambda edge: [(edge[0], edge[1]), (edge[1], edge[0])]) \
                    .groupByKey() \
                    .mapValues(lambda neighbors: list(neighbors))
    return adjacency

In [None]:
class HyperLogLog:
    def __init__(self, b=10):
        """

        Initializes HyperLogLog counter to estimate the number of distinct
        elements in a dataset.

        b (int): Number of bits used for bucket indexing.
        registers: store the maximum leading zero counts for each bucket.
        Number of registers p = 2^b. For b = 10, p = 1024.
        alpha_mm: scaling constant for accuracy.

        """
        self.b = b
        self.p = 1 << b
        self.registers = [0] * self.p
        self.alpha = self.__get_alpha()

    def __get_alpha(self):
        """

        Some predefined constants are used to improve cardinality estimates
        for smaller m values. For larger m values, the general formula is used.

        """
        if self.p == 16:
            return 0.673 * self.p * self.p
        elif self.p == 32:
            return 0.697 * self.p * self.p
        elif self.p == 64:
            return 0.709 * self.p * self.p
        else:
            return (0.7213 / (1 + 1.079 / self.p)) * self.p * self.p

    def add(self, value):
        """

        Adds a value to the HyperLogLog counter.
        value (str): The value to add.

        """
        print("Value being hashed:", value)

        # Hash the value using SHA-1
        hash_value = hashlib.sha1(value.encode('utf-8')).hexdigest()
        print("Hash Value:", hash_value)
        bin_hash = bin(int(hash_value, 16))[2:].zfill(160)

        # Extract the first b bits for the register index
        idx = int(bin_hash[:self.b], 2)

        # Count the number of leading zeros in the remaining bits
        w = bin_hash[self.b:]
        leading_zeros = len(w) - len(w.lstrip('0')) + 1
        print(self.registers)
        print(leading_zeros)

        # Update the register with the maximum leading zeros observed
        self.registers[idx] = max(self.registers[idx], leading_zeros)

    def union(self, other):
        """

        Combines two HyperLogLog counters by taking the maximum
        value of corresponding registers.

        """
        for i in range(self.p):
            self.registers[i] = max(self.registers[i], other.registers[i])

    def size(self):
        """

        Estimates the cardinality of the set tracked by the HyperLogLog counter
        by using the harmonic mean for estimation with a small-range correction.

        """
        Z = sum([2 ** -r for r in self.registers])
        estimate = self.alpha / Z

        # Small range correction
        if estimate <= (5 / 2) * self.p:
            zeros = self.registers.count(0)
            if zeros > 0:
                estimate = self.p * math.log(self.p / zeros)

        return estimate

## HyperBall Implementation

In [None]:
def hyperball(adjacency, max_iterations=5, b=10):
    """
    Implements the HyperBall algorithm for neighborhood approximation.

    Args:
        adjacency (RDD): RDD of (node, [neighbors]) tuples.
        max_iterations (int): Maximum number of iterations.
        b (int): Number of bits used for HyperLogLog bucket indexing.

    Returns:
        RDD: RDD of (node, estimated cardinality).
    """

    # Step 1: Initialize counters
    hlls = adjacency.map(lambda x: (x[0], HyperLogLog(b)))
    hlls = hlls.map(lambda node_hll: (node_hll[0], node_hll[1].add(node_hll[0]) or node_hll[1]))

    for i in range(max_iterations):
        print(f"Iteration {i+1}")

        # Step 2: Propagate neighborhood information
        messages = adjacency.join(hlls) \
                    .flatMap(lambda x: [(neighbor, x[1][1]) for neighbor in x[1][0]])

        # Step 3: Aggregate information
        updated_hlls = messages.reduceByKey(lambda hll1, hll2: (hll1.union(hll2), hll1)[1])

        # Step 4: Update counters
        hlls = hlls.leftOuterJoin(updated_hlls).mapValues(
            lambda x: (x[0].union(x[1]), x[0])[1] if x[1] else x[0])

        print("Updated HLLs:")
        print(hlls.take(10))

    # Step 5: Compute final cardinalities
    estimates = hlls.mapValues(lambda hll: hll.size())

    return estimates

## Results

In [None]:
file_path = "/content/drive/MyDrive/KTH/Data_Mining/facebook_combined.txt"
edges = read_graph(file_path, sc)
print(edges.take(10))

[('0', '1'), ('0', '2'), ('0', '3'), ('0', '4'), ('0', '5'), ('0', '6'), ('0', '7'), ('0', '8'), ('0', '9'), ('0', '10')]


In [None]:
adjacency = create_adjacency_list(edges)
print(adjacency.take(5))

[('0', ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', '100', '101', '102', '103', '104', '105', '106', '107', '108', '109', '110', '111', '112', '113', '114', '115', '116', '117', '118', '119', '120', '121', '122', '123', '124', '125', '126', '127', '128', '129', '130', '131', '132', '133', '134', '135', '136', '137', '138', '139', '140', '141', '142', '143', '144', '145', '146', '147', '148', '149', '150', '151', '152', '153', '154', '155', '156', '157', '

In [None]:
estimates = hyperball(adjacency, max_iterations=5, b=10)

Iteration 1
Updated HLLs:
[('4', <__main__.HyperLogLog object at 0x7c9777ee4730>), ('16', <__main__.HyperLogLog object at 0x7c9777ee4e80>), ('20', <__main__.HyperLogLog object at 0x7c9777ee7040>), ('22', <__main__.HyperLogLog object at 0x7c9777ee43d0>), ('24', <__main__.HyperLogLog object at 0x7c9777ee4820>), ('33', <__main__.HyperLogLog object at 0x7c9777ee47c0>), ('34', <__main__.HyperLogLog object at 0x7c9777ee6e00>), ('40', <__main__.HyperLogLog object at 0x7c9777ee55a0>), ('44', <__main__.HyperLogLog object at 0x7c9777ee50f0>), ('53', <__main__.HyperLogLog object at 0x7c9777ee4370>)]
Iteration 2
Updated HLLs:
[('24', <__main__.HyperLogLog object at 0x7c9777ee5420>), ('33', <__main__.HyperLogLog object at 0x7c9777ee6620>), ('70', <__main__.HyperLogLog object at 0x7c9777ee70d0>), ('77', <__main__.HyperLogLog object at 0x7c9777ee4730>), ('111', <__main__.HyperLogLog object at 0x7c9777ee57b0>), ('113', <__main__.HyperLogLog object at 0x7c9777ee5d80>), ('119', <__main__.HyperLogLog obj

In [None]:
results = estimates.collect()

print(f"{'Node':<10}{'Estimated Value':>20}")
print("-" * 30)
for node, estimate in results:
    print(f"{node:<10}{estimate:>20.6f}")

Node           Estimated Value
------------------------------
1382               4002.424958
1817               4002.424958
1740               4002.424958
504                4002.424958
1805               4002.424958
3184               4002.424958
1934               4002.424958
3100               4002.424958
3919               4002.424958
3711               4002.424958
1221               4002.424958
446                4002.424958
3265               4002.424958
3747               4002.424958
1325               4002.424958
656                4002.424958
1117               4002.424958
1192               4002.424958
1373               4002.424958
1951               4002.424958
33                 4002.424958
1531               4002.424958
3411               4002.424958
1950               4002.424958
1150               4002.424958
916                4002.424958
1822               4002.424958
3512               4002.424958
2195               4002.424958
3182               4002.424958
899     

In [None]:
unique_estimations = set(estimation for _, estimation in results)
nodes_set = set(node for node, _ in results)
num_unique_estimations = len(unique_estimations)
num_of_nodes = len(nodes_set)

print(f"Number of nodes: {num_of_nodes}")
print(f"Number of unique estimations: {num_unique_estimations}")
print(f"Unique estimations: {unique_estimations}")

Number of nodes: 4039
Number of unique estimations: 4
Unique estimations: {3936.845322812273, 4002.42495845636, 3949.6274512766495, 3743.6086241150656}


## Interpretation of results and further questions

- HyperBall and Flajolet-Martin only make approximations by using hash-based computation and probabilistic calculations (HyperLogLog) to estimate the result, so exactness is not provided. That is why estimations can converge to a small set of distinct values (in this case 4 different result value), especially if the size of bits (b in HyperBall)are fixed
- Probably the graph's nodes have similar properties (many nodes with similar degrees), so the estimated values can settle into clusters (algorithm calculates similar distances or reachabilities for multiple nodes)
- The algorithm estimates the number of distinct nodes that can be reached (directly or indirectly) from a given node (in 5 iterations)
- HyperLogLog estimates set cardinality (number of distinct elements) using hash-based techniques and it can cause floating-point approximations rather than exact integers
- Estimations correspond to approximate sizes of reachable node sets for different parts of the graph
- Same estimation means: nodes likely belong to the same or similarly connected regions in the graph

**Question 1: What were the challenges you faced when implementing the algorithm?**

1) Handling large-scale graphs made it harder and caused some computational problem, since running the code for the whole file took us more than 10 minutes

2) Debugging also caused problems, since the code contains several functions, but sc.setLogLevel("DEBUG") helped to find the problem

3) Several times PySpark overrode the built-in basic functions (eg. sum, max) so it took a long time to find out this problem

**Question 2: Can the algorithm be easily parallelized? If yes, how? If not, why? Explain.**

Yes, the algorithm can be easy parallelized. The parallelization can be done at node-level and at edge-level. 

- At node-level, each node's HLL counter update can be computed independently in parallel, because the update of a node's counter depends only on its current counter and the counter of its neighbors.

- At edge-level, during message propagation, edges can be processed in parallel to send information from source to destination.

**Question 3: Does the algorithm work for unbounded graph streams? Explain.**

No, the algorithm doesn't work for unbounded graph streams. This is due to the static graph assumption. Indeed, the algorithm assumes a static graph, where the adjacency list and edges are fixed at the beginning of the computation. To support unbounded graph streams, we should implement real-time propagation and windowing.

**Question 4: Does the algorithm support edge deletions? If not, what modification would it need? Explain.**

No, the algorithm doesn't support edge deletions. The main reason is due to the Union operator in HyperLogLog, because it's not reversible. Once two HLL are merged, the original state of the counters is lost. If an edge is deleted, the algorithm cannot subtract the effect of the deleted edge from the merged counters.

In order to support edge deletions, we should keep a separate data structure to keep track of the contributions of each edge or neighbor to the HLL counter. In this way, when an edge is deleted, its contribution is removed from the counter.