In [2]:
import os
os.chdir("/home/ralph/kafka/pydbsp")

In [30]:
def regular_join(left, right):
    output = []
    for left_key, left_value in left:
        for right_key, right_value in right:
            if left_key == right_key:
                output.append((left_key, left_value, right_value))
    return output

employees = [(0, "kristjan"), (1, "mark"), (2, "mike")]
salaries = [(2, "40000"), (0, "38750"), (1, "50000")]

employees_salaries = regular_join(employees, salaries)

print(f"Regular join: {employees_salaries}")

Regular join: [(0, 'kristjan', '38750'), (1, 'mark', '50000'), (2, 'mike', '40000')]


In [31]:
from pydbsp.zset import ZSet
from pydbsp.zset.functions.bilinear import join

employees_zset = ZSet({k: 1 for k in employees})
salaries_zset = ZSet({k: 1 for k in salaries})

employees_salaries_zset = join(
    employees_zset,
    salaries_zset,
    lambda left, right: left[0] == right[0],
    lambda left, right: (left[0], left[1], right[1])
)

print(f"ZSet join: {employees_salaries_zset}")

ZSet join: {(0, 'kristjan', '38750'): 1, (1, 'mark', '50000'): 1, (2, 'mike', '40000'): 1}


In [32]:
from pydbsp.zset import ZSetAddition
from pydbsp.stream import Stream, StreamHandle
from pydbsp.stream.operators.linear import Integrate
from pydbsp.zset.operators.bilinear import LiftedJoin

group = ZSetAddition()
employees_stream = Stream(group)
employees_stream_handle = StreamHandle(lambda: employees_stream)
employees_stream.send(employees_zset)

salaries_stream = Stream(group)
salaries_stream_handle = StreamHandle(lambda: salaries_stream)
salaries_stream.send(salaries_zset)

join_cmp = lambda left, right: left[0] == right[0]
join_projection = lambda left, right: (left[0], left[1], right[1])

integrated_employees = Integrate(employees_stream_handle)
integrated_salaries = Integrate(salaries_stream_handle)
stream_join = LiftedJoin(
    integrated_employees.output_handle(),
    integrated_salaries.output_handle(),
    join_cmp,
    join_projection
)
integrated_employees.step()
integrated_salaries.step()
stream_join.step()
print(f"ZSet stream join: {stream_join.output().latest()}")


ZSet stream join: {(0, 'kristjan', '38750'): 1, (1, 'mark', '50000'): 1, (2, 'mike', '40000'): 1}


In [33]:
from pydbsp.stream.operators.bilinear import Incrementalize2

incremental_stream_join = Incrementalize2(
    employees_stream_handle,
    salaries_stream_handle,
    lambda left, right: join(left, right, join_cmp, join_projection),
    group
)
incremental_stream_join.step()
print(f"Incremental ZSet stream join: {incremental_stream_join.output().latest()}")


Incremental ZSet stream join: {(0, 'kristjan', '38750'): 1, (1, 'mark', '50000'): 1, (2, 'mike', '40000'): 1}


In [34]:
employees_stream.send(ZSet({(2, "mike"): -1}))
incremental_stream_join.step()
print(f"Incremental ZSet stream join update: {incremental_stream_join.output().latest()}")

Incremental ZSet stream join update: {(2, 'mike', '40000'): -1}


In [35]:
from pydbsp.indexed_zset.functions.bilinear import join_with_index
from pydbsp.indexed_zset.operators.linear import LiftedIndex

indexer = lambda x: x[0]
index_employees = LiftedIndex(employees_stream_handle, indexer)
index_salaries = LiftedIndex(salaries_stream_handle, indexer)
join_with_indexing_projection = lambda key, left_value, right_value: (key, left_value[1], right_value[1])
incremental_sort_merge_join = Incrementalize2(index_employees.output_handle(), index_salaries.output_handle(), lambda l, r: join_with_index(l, r, join_with_indexing_projection), group)
index_employees.step()
index_salaries.step()
incremental_sort_merge_join.step()
print(f"Incremental indexed ZSet stream join: {incremental_sort_merge_join.output().latest()}")

Incremental indexed ZSet stream join: {(0, 'kristjan', '38750'): 1, (1, 'mark', '50000'): 1, (2, 'mike', '40000'): 1}


In [36]:
index_employees.step()
incremental_sort_merge_join.step()
print(f"Incremental ZSet stream join update: {incremental_sort_merge_join.output().latest()}")

Incremental ZSet stream join update: {(2, 'mike', '40000'): -1}


In [37]:
from random import randrange

names = ("kristjan", "mark", "mike")
max_pay = 100000
fake_data = [((i, names[randrange(len(names))] + str(i)), (i, randrange(max_pay))) for i in range(3, 10003)]
batch_size = 500
fake_data_batches = [fake_data[i: i + batch_size] for i in range(0, len(fake_data), batch_size)]

for batch in fake_data_batches:
    employees_stream.send(ZSet({employee: 1 for employee, _ in batch}))
    salaries_stream.send(ZSet({salary: 1 for salary, _ in batch}))

steps_to_take = len(fake_data_batches)


In [38]:
from tqdm.notebook import tqdm
from time import time

time_start = time()
measurements = []
for _ in tqdm(range(steps_to_take)):
    local_time = time()
    integrated_employees.step()
    integrated_salaries.step()
    stream_join.step()
    measurements.append(time() - local_time)
print(f"Time taken - on demand: {time() - time_start}s")

  0%|          | 0/20 [00:00<?, ?it/s]

Time taken - on demand: 126.65267968177795s


In [84]:
from pydbsp.indexed_zset.functions.bilinear import join_with_index
from pydbsp.indexed_zset.operators.linear import LiftedIndex
from pydbsp.stream import Stream, StreamHandle
from pydbsp.stream.functions.linear import stream_elimination
from pydbsp.stream.operators.bilinear import Incrementalize2
from pydbsp.zset import ZSet, ZSetAddition
from pydbsp.zset.operators.linear import LiftedSelect, LiftedProject
from pydbsp.zset.operators.unary import DeltaLiftedDeltaLiftedDistinct

employees = [(0, "kristjan"), (1, "mark"), (2, "mike"), (3, "mike")]
salaries = [(2, "40000"), (0, "38750"), (1, "50000"), (3, "45000")]

employees_zset = ZSet({k: 1 for k in employees})
salaries_zset = ZSet({k: 1 for k in salaries})

group = ZSetAddition()
employees_stream = Stream(group)
employees_stream_handle = StreamHandle(lambda: employees_stream)
employees_stream.send(employees_zset)

salaries_stream = Stream(group)
salaries_stream_handle = StreamHandle(lambda: salaries_stream)
salaries_stream.send(salaries_zset)

# select

# select * from employees where name <> 'mark'

select_cmp = lambda x: x[1] != "mark"

employees_selection = LiftedSelect(employees_stream_handle, select_cmp)
employees_selection_handle = employees_selection.output_handle()

# join

join_projection = lambda left, right: (left[0], left[1], right[1])

indexer = lambda x: x[0]
index_employees = LiftedIndex(employees_selection_handle, indexer)
index_salaries = LiftedIndex(salaries_stream_handle, indexer)
join_with_indexing_projection = lambda key, left_value, right_value: (key, left_value[1], right_value[1])
incremental_sort_merge_join = Incrementalize2(index_employees.output_handle(), index_salaries.output_handle(), lambda l, r: join_with_index(l, r, join_with_indexing_projection), group)
incremental_sort_merge_join_handle = incremental_sort_merge_join.output_handle()

# project

project = lambda x: (x[0], x[1] + "_abc", x[2]) 

y = LiftedProject(incremental_sort_merge_join_handle, project)
y_handle = y.output_handle()

# # distinct

# distinct = DeltaLiftedDeltaLiftedDistinct(y_handle)

# Execute

employees_selection.step()
print(f"{employees_selection.output().latest()}")

index_employees.step()
print(f"{index_employees.output().latest()}")

index_salaries.step()
print(f"{index_salaries.output().latest()}")

incremental_sort_merge_join.step()
print(f"{incremental_sort_merge_join.output().latest()}")

y.step()
print(f"{y.output().latest()}")

# distinct.step()
# print(f"{distinct.output().latest()}")

# new input

print()

new_employees = [(4, "peter"), (2, "bea"), (6, "tim")]
new_salaries = [(6, "42000"), (4, "30000")]

new_employees_zset = ZSet({k: 1 for k in new_employees})
new_salaries_zset = ZSet({k: 1 for k in new_salaries})

employees_stream.send(new_employees_zset)
salaries_stream.send(new_salaries_zset)

employees_selection.step()
print(f"{employees_selection.output().latest()}")

index_employees.step()
print(f"{index_employees.output().latest()}")

index_salaries.step()
print(f"{index_salaries.output().latest()}")

incremental_sort_merge_join.step()
print(f"{incremental_sort_merge_join.output().latest()}")

y.step()
print(f"{y.output().latest()}")
print(f"{stream_elimination(y.output_handle().get())}")


{(0, 'kristjan'): 1, (2, 'mike'): 1, (3, 'mike'): 1}
{(0, 'kristjan'): 1, (2, 'mike'): 1, (3, 'mike'): 1}
{(2, '40000'): 1, (0, '38750'): 1, (1, '50000'): 1, (3, '45000'): 1}
{(0, 'kristjan', '38750'): 1, (2, 'mike', '40000'): 1, (3, 'mike', '45000'): 1}
{(0, 'kristjan_abc', '38750'): 1, (2, 'mike_abc', '40000'): 1, (3, 'mike_abc', '45000'): 1}

{(4, 'peter'): 1, (2, 'bea'): 1, (6, 'tim'): 1}
{(4, 'peter'): 1, (2, 'bea'): 1, (6, 'tim'): 1}
{(6, '42000'): 1, (4, '30000'): 1}
{(4, 'peter', '30000'): 1, (6, 'tim', '42000'): 1, (2, 'bea', '40000'): 1}
{(4, 'peter_abc', '30000'): 1, (6, 'tim_abc', '42000'): 1, (2, 'bea_abc', '40000'): 1}
{(0, 'kristjan_abc', '38750'): 1, (2, 'mike_abc', '40000'): 1, (3, 'mike_abc', '45000'): 1, (4, 'peter_abc', '30000'): 1, (6, 'tim_abc', '42000'): 1, (2, 'bea_abc', '40000'): 1}
