<a href="https://colab.research.google.com/github/jchen6727/colab/blob/master/pubtk1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Tutorial 1**

**Note 0** This tutorial will set use the `pubtk` from tutorial 1 with socket communication features


In [1]:
#jupyter 0
from google.colab import drive # use drive.mount() to link Google Drive to
drive.mount('/content/drive')  # the google colab session

Mounted at /content/drive


**Note 1** since we've created the virtual environment in the first tutorial, now we can just link the `site-packages` to have access to our custom python environment

In [2]:
#jupyter 1
import site
site.addsitedir('/content/drive/MyDrive/venv/lib/python3.10/site-packages')

**Note 2** This time we'll focus on the **INET_Dispatcher** which uses `INET` (TCP) to communicate with the host. Let's inherit the `INET_Dispatcher` and an example `Submit`, the one that handles sockets on the `ZSH` environment.

In [6]:
#jupyter 2
from pubtk.runtk import INET_Dispatcher, Submit, ZSHSubmitSOCK, Template
print(ZSHSubmitSOCK())



submit:
zsh {cwd}/{label}.sh

path:
#!/bin/zsh
cd {cwd}
export SOCNAME="{sockname}"
export JOBID=$$
{env}
nohup {command} > {cwd}/{label}.run &
pid=$!
echo $pid >&1


script:
{cwd}/{label}.sh



**Note 3** Notice the important line:
```
export SOCNAME="{sockname}"
```
this is what we can use to establish the socket address which the `Dispatcher` and `Runner` can communicate through.

Let's extend the `Submit` with a new `class` that extends functionality to allow for `socket` communication on `Google Colab`

For reference, here is our old code:
```
class GCSubmit(Submit):
  def __init__(self):
    # creates a Submit with the templates we define
    super().__init__(
        submit_template = Template("sh {cwd}/{label}.sh"),
        script_template = Template("""\
#!/bin/bash
export FOO={foovalue}
export BAR={barvalue}
export BAZ={bazvalue}
{env}
nohup /content/drive/MyDrive/venv/bin/python /content/drive/MyDrive/dev/runner.py > {cwd}/{label}.run &
pid=$!
echo $pid >&1
"""
        )
    )
  def submit_job(self):
    # using this submit_job, we can add some handling of stdout, job failure (i.e. if stdout does not return an integer value as expected),
    # extending the functionality of Submit with this exception handling.
    proc = super().submit_job()
    try:
      self.job_id = int(proc.stdout)
    except Exception as e:
      raise(Exception("{}\nJob submission failed:\n{}\n{}\n{}\n{}".format(e, self.submit, self.script, proc.stdout, proc.stderr)))
    return self.job_id
```

In [13]:
#jupyter 3
class GCSubmitSOCK(Submit):
  def __init__(self):
    # creates a Submit with the templates we define
    super().__init__(
        submit_template = Template("sh {cwd}/{label}.sh"),
        script_template = Template("""\
#!/bin/bash
export SOCNAME="{sockname}"
{env}
nohup /content/drive/MyDrive/venv/bin/python /content/drive/MyDrive/dev/runner.py > {cwd}/{label}.run &
pid=$!
echo $pid >&1
"""
        )
    )
  def submit_job(self):
    # using this submit_job, we can add some handling of stdout, job failure (i.e. if stdout does not return an integer value as expected),
    # extending the functionality of Submit with this exception handling.
    proc = super().submit_job()
    try:
      self.job_id = int(proc.stdout)
    except Exception as e:
      raise(Exception("{}\nJob submission failed:\n{}\n{}\n{}\n{}".format(e, self.submit, self.script, proc.stdout, proc.stderr)))
    return self.job_id

gcs = GCSubmitSOCK()

**Note 4** N.B, the `socname={sockname}` provides a field that the INET_Dispatcher can fill as it requests a TCP port for communication. This happens at job creation

**Note 5** Now we can pass the custom submission to our **INET_Dispatcher** which extends the base **SH_Dispatcher** with support for communication.

In [14]:
#jupyter 5
dispatcher = INET_Dispatcher(cwd='/content', submit=gcs, gid='sock_example')
print(dispatcher.submit) # prints the dispatcher.submit


submit:
sh {cwd}/{label}.sh

path:
#!/bin/bash
export SOCNAME="{sockname}"
{env}
nohup /content/drive/MyDrive/venv/bin/python /content/drive/MyDrive/dev/runner.py > {cwd}/{label}.run &
pid=$!
echo $pid >&1


script:
{cwd}/{label}.sh



**Note 6** To pass arguments to the **Runner** script, we will call `update_env` from the dispatcher. The argument is a dictionary of `key:value` pairs. Additionally, we can update the arbitrary `FOO`, `BAR` and `BAZ` values from the `dispatcher.submit`

In [15]:
#jupyter 6
dispatcher.update_env({'strvalue': '1',
                       'intvalue': 2,
                       'fltvalue': 3.0})
print(dispatcher.submit)


submit:
sh {cwd}/{label}.sh

path:
#!/bin/bash
export SOCNAME="{sockname}"
{env}
nohup /content/drive/MyDrive/venv/bin/python /content/drive/MyDrive/dev/runner.py > {cwd}/{label}.run &
pid=$!
echo $pid >&1


script:
{cwd}/{label}.sh



**Note 7** Upon job creation, the `{env}`, `{cwd}` and `{label}` are filled.

the `{env}` will be replaced with a custom `serialization` (in this case, exported string values) that can then be deserialized by the **runner** in the `runner.py` script

In [16]:
#jupyter 7
dispatcher.create_job()
print(dispatcher.submit) # see the new submit


submit:
sh /content/example.sh

path:
#!/bin/bash
export SOCNAME="('172.28.0.12', 55501)"

export STRRUNTK3="strvalue=1"
export INTRUNTK4="intvalue=2"
export FLOATRUNTK5="fltvalue=3.0"
nohup /content/drive/MyDrive/venv/bin/python /content/drive/MyDrive/dev/runner.py > /content/example.run &
pid=$!
echo $pid >&1


script:
/content/example.sh



**Note 8** Let's download and check a basic `runner.py` using the `Runner` class.

In [17]:
#jupyter 8
!rm /content/drive/MyDrive/dev/runner.py

!cat /content/drive/MyDrive/dev/runner.py

from pubtk.runtk import SocketRunner
import os, sys, json

print(os.getpid())

runner = SocketRunner()

mappings = json.dumps(runner.mappings)

runner.connect()
runner.send(mappings)
print(mappings)

**Note 9** This new runner is similar to the one in the `pubtk0.ipynb`. Of note, it also calls a connect in order to establish communication with the `SOCNAME` TCP port, and finally calls `runner.close()` in order to deallocate the opened socket.

On the same end, the `dispatcher` also calls a `dispatcher.clean` to deallocate the opened socket.

N.B. generally sockets are deallocated with the completion of the function. We can use `lsof` to check this; however, it is considered good programming practice to `deallocate` and `free` resources when they are no longer in use...

In [19]:
#jupyter 9
dispatcher.submit_job()
connection, runner_address = dispatcher.accept()
recv_message = dispatcher.recv()
dispatcher.job_id # prints the job_id, should match the printed pid from the runner.py script
connection, runner_address
recv_message
dispatcher.clean([])


'{"fltvalue": 3.0, "strvalue": "1", "intvalue": 2}'

In [20]:
!lsof


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
drive     1011 1028 drive     root    3w      REG               0,51     4780 1310800 /tmp/drive.e8229860d1c1.root.log.INFO.20240220-140447.1011
drive     1011 1028 drive     root    4w      REG               0,51     1253 1310799 /root/.config/Google/DriveFS/Logs/parent.txt
drive     1011 1028 drive     root    5w      REG               0,51   109122 1310801 /root/.config/Google/DriveFS/Logs/drive_fs.txt
drive     1011 1028 drive     root    6r     FIFO               0,13      0t0   41181 pipe
drive     1011 1028 drive     root    7w     FIFO               0,13      0t0   41181 pipe
drive     1011 1028 drive     root    8ur     REG               0,51    12288 1310795 /root/.config/Google/DriveFS/metrics_store_sqlite.db
drive     1011 1028 drive     root    9u      REG               0,51    20632 1310816 /root/.config/Google/DriveFS/metrics_store_sqlite.db-wal
drive     1011 1028 drive     root   10ur     REG             