<a href="https://colab.research.google.com/github/wiemila/ml-in-pl-23/blob/main/1_pathway_static_to_streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# install Pathway
%%capture --no-display
!pip install pathway

In [3]:
# generic imports
import httpimport
import numpy as np
import pandas as pd

from IPython.display import display

In [4]:
# import Pathway and tutorial helpers
url = "https://gist.githubusercontent.com/janchorowski/3819b9ef1e0dd441030f7f3b80b93cc6/raw"
with httpimport.remote_repo(url):
    import mlinpl

import pathway as pw
from pathway.internals.parse_graph import G
from pathway.internals import dtype as dt

In [5]:
# A static Pathway computation - just load a table and fetch it whole as output
t_name = pw.debug.table_from_markdown(
    """
    | name
 1  | Alice
 2  | Bob
 3  | Carole
 """
)
pw.debug.compute_and_print(t_name)

            | name
^YYY4HAB... | Alice
^Z3QWT29... | Bob
^3CZ78B4... | Carole


In [6]:
# Now we go from batch to streaming. First, we need to instruct Pathway exactly
# about the types of data - during streaming the data is yet to come and
# not readily available for inspacetion.

mlinpl.generate_schema_from_table(t_name, "InSchema")

class InSchema(pw.Schema):
    name: dt.STR


In [7]:
# Copy-paste the schema above, and perhaps fix as needed
class InSchema(pw.Schema):
    name: dt.STR

# This clears the Pathway graph and helps with restarts
G.clear()

# Now lets build the graph using connectors for jupyter
ep = mlinpl.EmbeddedPathway()

# This creates a pair of:
# - a Pathway table, which we can use to build computations
# - a callback, allowing us to insert data into a running computation
in_table, in_writer = ep.input_table("input", schema=InSchema)

out_table=in_table.select

# A monitor reads changes to a table and allows inspection of data
# during a live computation
ep.monitor("in", in_table)

In [8]:
# Now start a Pathway worker in the background
ep.start()

In [9]:
# Send some data into the input connector
in_writer(name="MlinPL attendant")

In [10]:
# Now print the data in the monitor
ep.outs['in'].to_pandas()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^MBW4V691VG4BRK96V8QCWJY9QM,MlinPL attendant


In [11]:
# Send some more data into the input connector
in_writer(name="MlinPL attendant x2")

In [12]:
# and print
ep.outs['in'].to_pandas()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^5YWMMXYGPB26GV9S12CW38PNFW,MlinPL attendant x2
1,^MBW4V691VG4BRK96V8QCWJY9QM,MlinPL attendant


In [13]:
# We see a row was added to the table. However, the monitor also allows us to
# print only the differences. Since we use the function for the first time,
# it re-prints the whole table
ep.outs['in'].to_pandas_from_last_seen()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^5YWMMXYGPB26GV9S12CW38PNFW,MlinPL attendant x2
1,^MBW4V691VG4BRK96V8QCWJY9QM,MlinPL attendant


In [14]:
# Send some more data into the input connector
in_writer(name="MlinPL attendant x3")

In [15]:
# Now we see only the addition of the last row
ep.outs['in'].to_pandas_from_last_seen()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^YJGA9S7ASMCYDDHJ5AV4JGBFQC,MlinPL attendant x3


# Exercise:

Perhaps you have wondeered what the nimbers in the `_diff` column mean?

Let's find out.

1. Build a word-counting program.

   Hint: See the [docs for simple table operations](https://pathway.com/developers/user-guide/introduction/survival-guide/#manipulating-the-table) and use the `pw.reducers.count`

2. Try adding words and see how the output changes.

Do you see how the `_diff`s reflect value retractions and insertions

In [16]:
in_writer(name="MlinPL attendant x4")

In [17]:
# Now we see only the addition of the last row
ep.outs['in'].to_pandas_from_last_seen()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^TAB8G9DFWW37PP54SXR2P79NHG,MlinPL attendant x4


In [19]:
# Now we see only the addition of the last row
ep.outs['in'].to_pandas_from_last_seen()

In [18]:
ep.outs['in'].to_pandas()

Unnamed: 0_level_0,Unnamed: 1_level_0,name
_diff,id,Unnamed: 2_level_1
1,^5YWMMXYGPB26GV9S12CW38PNFW,MlinPL attendant x2
1,^MBW4V691VG4BRK96V8QCWJY9QM,MlinPL attendant
1,^TAB8G9DFWW37PP54SXR2P79NHG,MlinPL attendant x4
1,^YJGA9S7ASMCYDDHJ5AV4JGBFQC,MlinPL attendant x3
