<a href="https://colab.research.google.com/github/wiemila/MLinPL/blob/main/1_pathway_static_to_streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# install Pathway
%%capture --no-display
!pip install pathway

In [64]:
# generic imports
import httpimport
import numpy as np
import pandas as pd

from IPython.display import display

In [65]:
# import Pathway and tutorial helpers
url = "https://gist.githubusercontent.com/janchorowski/3819b9ef1e0dd441030f7f3b80b93cc6/raw"
with httpimport.remote_repo(url):
    import mlinpl

import pathway as pw
from pathway.internals.parse_graph import G
from pathway.internals import dtype as dt

In [66]:
# A static Pathway computation - just load a table and fetch it whole as output
t_name = pw.debug.table_from_markdown(
    """
    | name
 1  | Alice
 2  | Bob
 3  | Carole
 """
)
pw.debug.compute_and_print(t_name)

            | name
^YYY4HAB... | Alice
^Z3QWT29... | Bob
^3CZ78B4... | Carole


In [67]:
# Now we go from batch to streaming. First, we need to instruct Pathway exactly
# about the types of data - during streaming the data is yet to come and
# not readily available for inspacetion.

mlinpl.generate_schema_from_table(t_name, "InSchema")

class InSchema(pw.Schema):
    name: dt.STR


In [104]:
# Copy-paste the schema above, and perhaps fix as needed
class InSchema(pw.Schema):
    name: int

# This clears the Pathway graph and helps with restarts
G.clear()

# Now lets build the graph using connectors for jupyter
ep = mlinpl.EmbeddedPathway()

# This creates a pair of:
# - a Pathway table, which we can use to build computations
# - a callback, allowing us to insert data into a running computation
in_table, in_writer = ep.input_table("input", schema=InSchema)

#out_table=in_table.select(ret=in_table.name +1)
out_table = in_table.groupby(pw.this.name).reduce(pw.this.name, cnt=pw.reducers.count())
# A monitor reads changes to a table and allows inspection of data
# during a live computation
ep.monitor("in", in_table)
ep.monitor("out", out_table)

In [105]:
# Now start a Pathway worker in the background
ep.start()

In [106]:
# Send some data into the input connector
in_writer(name=0)

In [107]:
in_writer(name=2)
in_writer(name=4)

In [108]:
# Now print the data in the monitor
ep.outs['in'].to_pandas()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^5YWMMXYGPB26GV9S12CW38PNFW,2
1,^MBW4V691VG4BRK96V8QCWJY9QM,0
1,^YJGA9S7ASMCYDDHJ5AV4JGBFQC,4


In [109]:
ep.outs['out'].events

[Event(time=1e+20, key=None, diff=None, value=None),
 Event(time=1698591367968, key='^X1MXHYYG4YM0DB900V28XN5T4W', diff=1, value={'name': 0, 'cnt': 1}),
 Event(time=1698591369668, key='^Z3QWT294JQSHPSR8KTPG9ECE4W', diff=1, value={'name': 2, 'cnt': 1}),
 Event(time=1698591369668, key='^3HN31E1PBT7YHH5PWVKTZCPRJ8', diff=1, value={'name': 4, 'cnt': 1})]

In [91]:
# Send some more data into the input connector
in_writer(name=4)

In [92]:
# and print
ep.outs['out'].to_pandas()

Unnamed: 0_level_0,Unnamed: 1_level_0,ret
_diff,id,Unnamed: 2_level_1
1,^5YWMMXYGPB26GV9S12CW38PNFW,3
1,^66TAC1KYJMKMM0W9FF9TAD9GQ0,5
1,^6HNVX3XJR6TP92XKT2NP2XEEQR,3
1,^ER9FDCM2TZ0EHMYB972J9MNPVR,5
1,^MBW4V691VG4BRK96V8QCWJY9QM,1
1,^TAB8G9DFWW37PP54SXR2P79NHG,5
1,^YJGA9S7ASMCYDDHJ5AV4JGBFQC,3


In [80]:
# We see a row was added to the table. However, the monitor also allows us to
# print only the differences. Since we use the function for the first time,
# it re-prints the whole table
ep.outs['in'].to_pandas_from_last_seen()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^5YWMMXYGPB26GV9S12CW38PNFW,2
1,^66TAC1KYJMKMM0W9FF9TAD9GQ0,4
1,^6HNVX3XJR6TP92XKT2NP2XEEQR,2
1,^ER9FDCM2TZ0EHMYB972J9MNPVR,4
1,^MBW4V691VG4BRK96V8QCWJY9QM,0
1,^TAB8G9DFWW37PP54SXR2P79NHG,4
1,^YJGA9S7ASMCYDDHJ5AV4JGBFQC,2


In [100]:
# Send some more data into the input connector
in_writer(name=4)

In [102]:
# Now we see only the addition of the last row
ep.outs['in'].to_pandas_from_last_seen()

# Exercise:

Perhaps you have wondeered what the nimbers in the `_diff` column mean?

Let's find out.

1. Build a word-counting program.

   Hint: See the [docs for simple table operations](https://pathway.com/developers/user-guide/introduction/survival-guide/#manipulating-the-table) and use the `pw.reducers.count`

2. Try adding words and see how the output changes.

Do you see how the `_diff`s reflect value retractions and insertions