Skip to content

Commit

Permalink
Merge e19644d into 8741761
Browse files Browse the repository at this point in the history
  • Loading branch information
MainRo committed Nov 9, 2022
2 parents 8741761 + e19644d commit 5daa703
Show file tree
Hide file tree
Showing 19 changed files with 176 additions and 247 deletions.
2 changes: 1 addition & 1 deletion bench/bench_group_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ def dataset(count):
'''

print("done!")
time.sleep(5.0)
#time.sleep(5.0)
16 changes: 8 additions & 8 deletions rxsci/data/lag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ def on_subscribe(observer, scheduler):
def on_next(i):
nonlocal state
if type(i) is rs.OnNextMux:
iprev = i.store.get_state(state, i.key)
iprev = i.store.get_state(state, i.key[0])
if iprev is rs.state.markers.STATE_NOTSET:
iprev = i.item

ii = (iprev, i.item)
i.store.set_state(state, i.key, i.item)
i.store.set_state(state, i.key[0], i.item)
observer.on_next(i._replace(item=ii))
elif type(i) is rs.OnCreateMux:
i.store.add_key(state, i.key)
i.store.add_key(state, i.key[0])
observer.on_next(i)
elif type(i) is rs.OnCompletedMux or type(i) is rs.OnErrorMux:
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])
observer.on_next(i)
elif type(i) is rs.state.ProbeStateTopology:
state = i.topology.create_state(name='lag1', data_type='obj')
Expand Down Expand Up @@ -76,20 +76,20 @@ def on_next(i):
nonlocal state

if isinstance(i, rs.OnNextMux):
q = i.store.get_state(state, i.key)
q = i.store.get_state(state, i.key[0])
q.append(i.item)
observer.on_next(i._replace(item=(q[0], i.item)))
if len(q) > size:
q.popleft()

elif isinstance(i, rs.OnCreateMux):
i.store.add_key(state, i.key)
i.store.set_state(state, i.key, deque())
i.store.add_key(state, i.key[0])
i.store.set_state(state, i.key[0], deque())
observer.on_next(i)

elif isinstance(i, rs.OnCompletedMux) \
or isinstance(i, rs.OnErrorMux):
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])
observer.on_next(i)

elif type(i) is rs.state.ProbeStateTopology:
Expand Down
18 changes: 9 additions & 9 deletions rxsci/data/pad.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ def on_next(i):
nonlocal state

if type(i) is rs.OnNextMux:
v = i.store.get_state(state, i.key)
v = i.store.get_state(state, i.key[0])
if v is rs.state.markers.STATE_NOTSET:
i.store.set_state(state, i.key, True)
i.store.set_state(state, i.key[0], True)
v = value if value is not None else i.item
for _ in range(size):
observer.on_next(i._replace(item=v))
observer.on_next(i)

elif type(i) is rs.OnCreateMux:
i.store.add_key(state, i.key)
i.store.add_key(state, i.key[0])
observer.on_next(i)

elif type(i) is rs.OnCompletedMux or type(i) is rs.OnErrorMux:
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])
observer.on_next(i)

elif type(i) is rs.state.ProbeStateTopology:
Expand Down Expand Up @@ -85,25 +85,25 @@ def on_next(i):
nonlocal state

if type(i) is rs.OnNextMux:
i.store.set_state(state, i.key, i.item)
i.store.set_state(state, i.key[0], i.item)
observer.on_next(i)

elif type(i) is rs.OnCreateMux:
i.store.add_key(state, i.key)
i.store.add_key(state, i.key[0])
observer.on_next(i)

elif type(i) is rs.OnCompletedMux:
v = i.store.get_state(state, i.key)
v = i.store.get_state(state, i.key[0])
if v is not rs.state.markers.STATE_NOTSET:
if value is not None:
v = value
for _ in range(size):
observer.on_next(rs.OnNextMux(i.key, v, i.store))
observer.on_next(i)
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])

elif type(i) is rs.OnErrorMux:
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])
observer.on_next(i)

elif type(i) is rs.state.ProbeStateTopology:
Expand Down
40 changes: 20 additions & 20 deletions rxsci/data/roll.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,49 +23,49 @@ def on_next(i):
nonlocal state_n
nonlocal state_w
if isinstance(i, rs.OnNextMux):
n = i.store.get_state(state_n, i.key)
n = i.store.get_state(state_n, i.key[0])

if (n % stride) == 0:
offset = (n // stride) % density
index = i.key[0] * density + offset
i.store.set_state(state_w, (index, i.key), n)
i.store.set_state(state_w, index, n)
observer.on_next(rs.OnCreateMux((index, i.key), i.store))

for offset in range(density):
index = i.key[0] * density + offset
w_value = i.store.get_state(state_w, (index, i.key))
w_value = i.store.get_state(state_w, index)
if w_value != -1:
observer.on_next(i._replace(key=(index, i.key)))
count = n - w_value + 1
if count == window:
i.store.set_state(state_w, (index, i.key), -1)
i.store.set_state(state_w, index, -1)
observer.on_next(rs.OnCompletedMux((index, i.key), i.store))

n_value = i.store.get_state(state_n, i.key)
i.store.set_state(state_n, i.key, n_value+1)
n_value = i.store.get_state(state_n, i.key[0])
i.store.set_state(state_n, i.key[0], n_value+1)

elif isinstance(i, rs.OnCreateMux):
i.store.add_key(state_n, (i.key[0], i.key))
i.store.add_key(state_n, i.key[0])
for offset in range(density):
i.store.add_key(state_w, (i.key[0]*density+offset, i.key))
i.store.add_key(state_w, i.key[0]*density+offset)
outer_observer.on_next(i)
elif isinstance(i, rs.OnCompletedMux):
kindex = i.key[0]
i.store.set_state(state_n, (kindex, i.key), 0)
i.store.set_state(state_n, kindex, 0)
for offset in range(density):
index = i.key[0] * density + offset
if i.store.get_state(state_w, (index, i.key)) != -1:
if i.store.get_state(state_w, index) != -1:
observer.on_next(i._replace(key=(index, i.key)))
i.store.set_state(state_w, (index, i.key), -1)
i.store.set_state(state_w, index, -1)
outer_observer.on_next(i)
elif isinstance(i, rs.OnErrorMux):
kindex = i.key[0]
i.store.set_state(state_n, (kindex, i.key), 0)
i.store.set_state(state_n, kindex, 0)
for offset in range(density):
index = i.key[0] * density + offset
if i.store.get_state(state_w, (index, i.key)) != -1:
if i.store.get_state(state_w, index) != -1:
observer.on_next(i._replace(key=(index, i.key)))
i.store.set_state(state_w, (index, i.key), -1)
i.store.set_state(state_w, index, -1)
outer_observer.on_next(i)
elif type(i) is rs.state.ProbeStateTopology:
state_n = i.topology.create_state(name="roll", data_type='uint', default_value=0)
Expand Down Expand Up @@ -94,28 +94,28 @@ def on_next(i):
nonlocal state

if type(i) is rs.OnNextMux:
count = i.store.get_state(state, i.key)
count = i.store.get_state(state, i.key[0])
if count == 0:
observer.on_next(rs.OnCreateMux((i.key[0], i.key), i.store))

count += 1
observer.on_next(i._replace(key=(i.key[0], i.key)))

if count == window:
i.store.set_state(state, i.key, 0)
i.store.set_state(state, i.key[0], 0)
observer.on_next(rs.OnCompletedMux((i.key[0], i.key), i.store))
else:
i.store.set_state(state, i.key, count)
i.store.set_state(state, i.key[0], count)

elif type(i) is rs.OnCreateMux:
i.store.add_key(state, i.key)
i.store.add_key(state, i.key[0])
outer_observer.on_next(i)

elif type(i) in [rs.OnCompletedMux, rs.OnErrorMux]:
count = i.store.get_state(state, i.key)
count = i.store.get_state(state, i.key[0])
if count > 0:
observer.on_next(i._replace(key=(i.key[0], i.key)))
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])
outer_observer.on_next(i)

elif type(i) is rs.state.ProbeStateTopology:
Expand Down
12 changes: 6 additions & 6 deletions rxsci/data/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@ def on_next(i):

if type(i) is rs.OnNextMux:
new_predicate = predicate(i.item)
current_predicate = i.store.get_state(state, i.key)
current_predicate = i.store.get_state(state, i.key[0])
if current_predicate is rs.state.markers.STATE_NOTSET:
current_predicate = new_predicate
i.store.set_state(state, i.key, current_predicate)
i.store.set_state(state, i.key[0], current_predicate)
observer.on_next(rs.OnCreateMux((i.key[0], i.key), i.store))

if new_predicate != current_predicate:
i.store.set_state(state, i.key, new_predicate)
i.store.set_state(state, i.key[0], new_predicate)
observer.on_next(rs.OnCompletedMux((i.key[0], i.key), i.store))
observer.on_next(rs.OnCreateMux((i.key[0], i.key), i.store))

observer.on_next(i._replace(key=(i.key[0], i.key)))

elif type(i) is rs.OnCreateMux:
i.store.add_key(state, i.key)
i.store.add_key(state, i.key[0])
outer_observer.on_next(i)

elif isinstance(i, rs.OnCompletedMux):
current_predicate = i.store.get_state(state, i.key)
current_predicate = i.store.get_state(state, i.key[0])
if current_predicate is not rs.state.markers.STATE_NOTSET:
observer.on_next(i._replace(key=(i.key[0], i.key)))
outer_observer.on_next(i)

elif type(i) is rs.OnErrorMux:
current_predicate = i.store.get_state(state, i.key)
current_predicate = i.store.get_state(state, i.key[0])
if current_predicate is not rs.state.markers.STATE_NOTSET:
observer.on_next(i._replace(key=(i.key[0], i.key)))
outer_observer.on_next(i)
Expand Down
28 changes: 14 additions & 14 deletions rxsci/data/time_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,45 @@ def on_next(i):

if type(i) is rs.OnNextMux:
new_timestamp = time_mapper(i.item)
start_timestamp = i.store.get_state(state_start, i.key)
last_timestamp = i.store.get_state(state_last, i.key)
start_timestamp = i.store.get_state(state_start, i.key[0])
last_timestamp = i.store.get_state(state_last, i.key[0])
if start_timestamp is rs.state.markers.STATE_NOTSET:
start_timestamp = new_timestamp
last_timestamp = new_timestamp
i.store.set_state(state_start, i.key, start_timestamp)
i.store.set_state(state_last, i.key, last_timestamp)
i.store.set_state(state_start, i.key[0], start_timestamp)
i.store.set_state(state_last, i.key[0], last_timestamp)
observer.on_next(rs.OnCreateMux((i.key[0], i.key), i.store))

if _session_has_expired(start_timestamp, last_timestamp, new_timestamp):
i.store.set_state(state_start, i.key, new_timestamp)
i.store.set_state(state_last, i.key, new_timestamp)
i.store.set_state(state_start, i.key[0], new_timestamp)
i.store.set_state(state_last, i.key[0], new_timestamp)
observer.on_next(rs.OnCompletedMux((i.key[0], i.key), i.store))
observer.on_next(rs.OnCreateMux((i.key[0], i.key), i.store))
elif closing_mapper is not None and closing_mapper(i.item) is True:
i.store.set_state(state_start, i.key, new_timestamp)
i.store.set_state(state_last, i.key, new_timestamp)
i.store.set_state(state_start, i.key[0], new_timestamp)
i.store.set_state(state_last, i.key[0], new_timestamp)
if include_closing_item is True:
observer.on_next(i._replace(key=(i.key[0], i.key)))
observer.on_next(rs.OnCompletedMux((i.key[0], i.key), i.store))
observer.on_next(rs.OnCreateMux((i.key[0], i.key), i.store))
if include_closing_item is True:
return
else:
i.store.set_state(state_last, i.key, new_timestamp)
i.store.set_state(state_last, i.key[0], new_timestamp)

observer.on_next(i._replace(key=(i.key[0], i.key)))

elif type(i) is rs.OnCreateMux:
i.store.add_key(state_start, i.key)
i.store.add_key(state_last, i.key)
i.store.add_key(state_start, i.key[0])
i.store.add_key(state_last, i.key[0])
outer_observer.on_next(i)

elif type(i) in [rs.OnCompletedMux, rs.OnErrorMux]:
start_timestamp = i.store.get_state(state_start, i.key)
start_timestamp = i.store.get_state(state_start, i.key[0])
if start_timestamp is not rs.state.markers.STATE_NOTSET:
observer.on_next(i._replace(key=(i.key[0], i.key)))
i.store.del_key(state_start, i.key)
i.store.del_key(state_last, i.key)
i.store.del_key(state_start, i.key[0])
i.store.del_key(state_last, i.key[0])
outer_observer.on_next(i)

elif type(i) is rs.state.ProbeStateTopology:
Expand Down
8 changes: 4 additions & 4 deletions rxsci/operators/assert_.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def on_subscribe_mux(observer, scheduler):
def on_next(i):
nonlocal state
if isinstance(i, rs.OnNextMux):
value = i.store.get_state(state, i.key)
value = i.store.get_state(state, i.key[0])
if value is not rs.state.markers.STATE_NOTSET:
if predicate(value, i.item) is True:
observer.on_next(i)
Expand All @@ -133,15 +133,15 @@ def on_next(i):
else:
observer.on_next(i)

i.store.set_state(state, i.key, i.item)
i.store.set_state(state, i.key[0], i.item)

elif isinstance(i, rs.OnCreateMux):
i.store.add_key(state, i.key)
i.store.add_key(state, i.key[0])
observer.on_next(i)

elif isinstance(i, rs.OnCompletedMux) \
or isinstance(i, rs.OnErrorMux):
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])
observer.on_next(i)

elif type(i) is rs.state.ProbeStateTopology:
Expand Down
8 changes: 4 additions & 4 deletions rxsci/operators/distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ def on_next(x):
observer.on_error(ex)
return

_state = x.store.get_state(state, x.key)
_state = x.store.get_state(state, x.key[0])
if key not in _state:
_state.add(key)
observer.on_next(x)
elif type(x) is rs.OnCreateMux:
x.store.add_key(state, x.key)
x.store.set_state(state, x.key, set())
x.store.add_key(state, x.key[0])
x.store.set_state(state, x.key[0], set())
observer.on_next(x)

elif type(x) in [rs.OnCompletedMux, rs.OnErrorMux]:
x.store.del_key(state, x.key)
x.store.del_key(state, x.key[0])
observer.on_next(x)

elif type(x) is rs.state.ProbeStateTopology:
Expand Down
8 changes: 4 additions & 4 deletions rxsci/operators/first.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ def on_next(i):
nonlocal state

if type(i) is rs.OnNextMux:
value = i.store.get_state(state, i.key)
value = i.store.get_state(state, i.key[0])
if value is False:
observer.on_next(i)
i.store.set_state(state, i.key, True)
i.store.set_state(state, i.key[0], True)

elif type(i) is rs.OnCreateMux:
i.store.add_key(state, i.key)
i.store.add_key(state, i.key[0])
observer.on_next(i)

elif type(i) is rs.OnCompletedMux:
observer.on_next(i)
i.store.del_key(state, i.key)
i.store.del_key(state, i.key[0])

elif type(i) is rs.state.ProbeStateTopology:
state = i.topology.create_state(name='first', data_type=bool, default_value=False)
Expand Down
Loading

0 comments on commit 5daa703

Please sign in to comment.