In [0]:
##Python Code 

In [0]:
class Bucket:
    def __init__(self, sum_value, timestamp):
        self.sum_value = sum_value
        self.timestamp = timestamp

class DGIM:
    def __init__(self, k):
        self.k = k
        self.buckets = []
        self.timestamp = 0

    def update(self, value):
        self.timestamp += 1
        self._remove_old_buckets()
        
        # Add new bucket for current value
        self.buckets.append(Bucket(value, self.timestamp))
        
        self._combine_buckets()

    def _remove_old_buckets(self):
        while self.buckets and self.buckets[0].timestamp <= self.timestamp - self.k:
            self.buckets.pop(0)

    def _combine_buckets(self):
        sizes = {}
        i = len(self.buckets) - 1
        while i >= 0:
            size = self.buckets[i].sum_value
            if size not in sizes:
                sizes[size] = 1
            else:
                sizes[size] += 1

            if sizes[size] > 2:
                # Combine two oldest buckets of this size
                b1 = self.buckets[i-1]
                b2 = self.buckets[i]
                self.buckets.pop(i-1)
                self.buckets.pop(i-1)
                combined_bucket = Bucket(b1.sum_value + b2.sum_value, b2.timestamp)
                self.buckets.insert(i-1, combined_bucket)
                sizes[size] = 1
                i -= 1
            i -= 1

    def get_sum(self):
        total = 0
        for bucket in self.buckets:
            total += bucket.sum_value
        return total

In [0]:
# %pip install pyspark

In [0]:
#Spark Application

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("DGIM Algorithm").getOrCreate()
# Initialize a Spark context
sc = SparkContext.getOrCreate()
def run_dgim_algorithm(input_data, k):
    # Create an RDD from your input data
    input_rdd = sc.parallelize(input_data)
    # Create an instance of the DGIM class
    dgim = DGIM(k)

    # Define a function to update the DGIM instance for each input value
    def update_dgim(value):
        dgim.update(value)
        return dgim.get_sum()

    # Use the map transformation to update DGIM for each value in the input RDD
    result_rdd = input_rdd.map(update_dgim)

    # Collect the results into a list
    result_list = result_rdd.collect()

    # Return the final results
    return result_list
if __name__ == "__main__":
    # Define your input data as a list of values
    input_data = [0, 1, 1, 0, 1, 0, 0, 1, 0, 0]

    # Define the parameter 'k' for DGIM
    k = 4
    # Run the DGIM algorithm on the input data
    result = run_dgim_algorithm(input_data, k)
    # Print the results
    print("DGIM Results:", result)
    


DGIM Results: [0, 1, 1, 0, 1, 0, 0, 1, 0, 0]


In [0]:
   # Stop the Spark context
sc.stop()

    # Stop the Spark session
spark.stop()