<a href="https://colab.research.google.com/github/jchen6727/pubtk/blob/development/examples/colab_driveless/pubtk1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Tutorial 1**

**Note 0** This tutorial will set use the `pubtk` from tutorial 1 with socket communication features


In [1]:
#jupyter 0
!git clone --depth 1 --single-branch --branch development https://github.com/jchen6727/pubtk.git /content/pubtk # clone from the development branch
!pip install -e /content/pubtk
import site
site.addsitedir('/usr/local/lib/python3.10/dist-packages')

fatal: destination path '/content/pubtk' already exists and is not an empty directory.
Obtaining file:///content/pubtk
  Installing build dependencies ... [?25l[?25hdone
  Checking if build backend supports build_editable ... [?25l[?25hdone
  Getting requirements to build editable ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing editable metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: pubtk
  Building editable for pubtk (pyproject.toml) ... [?25l[?25hdone
  Created wheel for pubtk: filename=pubtk-0.0.1-0.editable-py3-none-any.whl size=3392 sha256=6add1c8092b93f8d6ab734e3118e84c2c4ab9411fb4ff49e9e0c20466143d66a
  Stored in directory: /tmp/pip-ephem-wheel-cache-tsd_sh0c/wheels/b0/4b/05/910e04ee783de27d02d88594f925ec7b25c0afa676c3282e44
Successfully built pubtk
Installing collected packages: pubtk
  Attempting uninstall: pubtk
    Found existing installation: pubtk 0.0.1
    Uninstalling pubtk-0.0.1:
     

**Note 1** This time we'll focus on the **INET_Dispatcher** which uses `INET` (TCP) to communicate with the host. Let's inherit the `INET_Dispatcher` and an example `Submit`, the one that handles sockets on the `ZSH` environment.

In [2]:
#jupyter 1
from pubtk.runtk import INET_Dispatcher, Submit, ZSHSubmitSOCK, Template
print(ZSHSubmitSOCK())



submit:
zsh {output_path}/{label}.sh

script:
#!/bin/zsh
cd {project_path}
export SOCNAME="{sockname}"
export JOBID=$$
{env}
nohup {command} > {output_path}/{label}.run 2>&1 &
pid=$!
echo $pid >&1


path:
{output_path}/{label}.sh

handles:
{
	runtk.SUBMIT: "{output_path}/{label}.sh",
	runtk.STDOUT: "{output_path}/{label}.run",
	runtk.SOCKET: "{sockname}",
}

kwargs:
{'output_path': '{output_path}', 'project_path': '{project_path}', 'label': '{label}', 'sockname': '{sockname}', 'command': '{command}', 'env': '{env}'}



**Note 2** Notice the important line:
```
export SOCNAME="{sockname}"
```
this is what we can use to establish the socket address which the `Dispatcher` and `Runner` can communicate through.

Let's extend the `Submit` with a new `class` that extends functionality to allow for `socket` communication on `Google Colab`

For reference, here is our old code:
```
class GCSubmit(Submit):
  def __init__(self):
    # creates a Submit with the templates we define
    super().__init__(
        submit_template = Template("sh {output_path}/{label}.sh"),
        script_template = Template("""\
#!/bin/bash
cd {project_path}
export FOO={foovalue}
export BAR={barvalue}
export BAZ={bazvalue}
{env}
nohup python /content/runner.py > {output_path}/{label}.run 2>&1 &
pid=$!
echo $pid >&1
"""
        )
    )
  def submit_job(self):
    # using this submit_job, we can add some handling of stdout, job failure (i.e. if stdout does not return an integer value as expected),
    # extending the functionality of Submit with this exception handling.
    proc = super().submit_job()
    try:
      self.job_id = int(proc.stdout)
    except Exception as e:
      raise(Exception("{}\nJob submission failed:\n{}\n{}\n{}\n{}".format(e, self.submit, self.script, proc.stdout, proc.stderr)))
    return self.job_id

gcs = GCSubmit()
print(gcs)
```

In [3]:
#jupyter 2
class GCSubmitSOCK(Submit):
  def __init__(self):
    # creates a Submit with the templates we define
    super().__init__(
        submit_template = Template("sh {output_path}/{label}.sh"),
        script_template = Template("""\
#!/bin/bash
cd {project_path}
export SOCNAME="{sockname}"
{env}
nohup python /content/runner.py > {output_path}/{label}.run 2>&1 &
pid=$!
echo $pid >&1
"""
        )
    )
  def submit_job(self):
    # using this submit_job, we can add some handling of stdout, job failure (i.e. if stdout does not return an integer value as expected),
    # extending the functionality of Submit with this exception handling.
    proc = super().submit_job()
    try:
      self.job_id = int(proc.stdout)
    except Exception as e:
      raise(Exception("{}\nJob submission failed:\n{}\n{}\n{}\n{}".format(e, self.submit, self.script, proc.stdout, proc.stderr)))
    return self.job_id

gcs = GCSubmitSOCK()

**Note 3.0** N.B, the `socname={sockname}` provides a field that the INET_Dispatcher can fill as it requests a TCP port for communication. This happens at job creation

**Note 3.1** Now we can pass the custom submission to our **INET_Dispatcher** which extends the base **SH_Dispatcher** with support for communication.

In [4]:
#jupyter 3
dispatcher = INET_Dispatcher(project_path='/content', output_path='./batch', submit=gcs, gid='sock_example')
print(dispatcher.submit) # prints the dispatcher.submit


submit:
sh {output_path}/{label}.sh

script:
#!/bin/bash
cd {project_path}
export SOCNAME="{sockname}"
{env}
nohup python /content/runner.py > {output_path}/{label}.run 2>&1 &
pid=$!
echo $pid >&1


path:
{output_path}/{label}.sh

handles:
{
	runtk.SUBMIT: "{output_path}/{label}.sh",
	runtk.STDOUT: "{output_path}/{label}.run",
	runtk.SOCKET: "{sockname}",
}

kwargs:
{'output_path': '{output_path}', 'label': '{label}', 'project_path': '{project_path}', 'sockname': '{sockname}', 'env': '{env}'}



**Note 6** To pass arguments to the **Runner** script, we will call `update_env` from the dispatcher. The argument is a dictionary of `key:value` pairs. Additionally, we can update the arbitrary `FOO`, `BAR` and `BAZ` values from the `dispatcher.submit`

In [5]:
#jupyter 6
dispatcher.update_env({'strvalue': '1',
                       'intvalue': 2,
                       'fltvalue': 3.0})
print(dispatcher.submit)


submit:
sh {output_path}/{label}.sh

script:
#!/bin/bash
cd {project_path}
export SOCNAME="{sockname}"
{env}
nohup python /content/runner.py > {output_path}/{label}.run 2>&1 &
pid=$!
echo $pid >&1


path:
{output_path}/{label}.sh

handles:
{
	runtk.SUBMIT: "{output_path}/{label}.sh",
	runtk.STDOUT: "{output_path}/{label}.run",
	runtk.SOCKET: "{sockname}",
}

kwargs:
{'output_path': '{output_path}', 'label': '{label}', 'project_path': '{project_path}', 'sockname': '{sockname}', 'env': '{env}'}



**Note 7** Upon job creation, the `{env}`, `{project_path}`, `{output_path}`  and `{label}` are filled.

the `{env}` will be replaced with a custom `serialization` (in this case, exported string values) that can then be deserialized by the **runner** in the `runner.py` script

In [6]:
#jupyter 7
dispatcher.create_job()
print(dispatcher.submit) # see the new submit


submit:
sh /content/batch/sock_example.sh

script:
#!/bin/bash
cd /content
export SOCNAME="('172.28.0.12', 53103)"

export STRRUNTK0="strvalue=1"
export INTRUNTK1="intvalue=2"
export FLOATRUNTK2="fltvalue=3.0"
nohup python /content/runner.py > /content/batch/sock_example.run 2>&1 &
pid=$!
echo $pid >&1


path:
/content/batch/sock_example.sh

handles:
{
	runtk.SUBMIT: "/content/batch/sock_example.sh",
	runtk.STDOUT: "/content/batch/sock_example.run",
	runtk.SOCKET: "('172.28.0.12', 53103)",
}

kwargs:
{'output_path': '{output_path}', 'label': '{label}', 'project_path': '{project_path}', 'sockname': '{sockname}', 'env': '{env}'}



**Note 8** Let's download and check a more advanced `socket_runner.py` using the `Runner` class.

In [7]:
#jupyter 8
!curl https://raw.githubusercontent.com/jchen6727/pubtk/development/examples/colab/socket_runner.py > /content/runner.py
!cat /content/runner.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   208  100   208    0     0   1184      0 --:--:-- --:--:-- --:--:--  1188
from pubtk.runtk import SocketRunner
import os, json

print(os.getpid())

runner = SocketRunner()

mappings = json.dumps(runner.mappings)

runner.connect()
runner.send(mappings)
print(mappings)
runner.close()

**Note 9** This new runner is similar to the one in the `pubtk0.ipynb`. Of note, it also calls a connect in order to establish communication with the `SOCNAME` TCP port, and finally calls `runner.close()` in order to deallocate the opened socket.

On the same end, the `dispatcher` also calls a `dispatcher.clean` to deallocate the opened socket.

N.B. generally sockets are deallocated with the completion of the function. We can use `lsof` to check this; however, it is considered good programming practice to `deallocate` and `free` resources when they are no longer in use...

In [8]:
#jupyter 9
dispatcher.submit_job()
connection, runner_address = dispatcher.accept()
recv_message = dispatcher.recv()
dispatcher.job_id # prints the job_id, should match the printed pid from the runner.py script
connection, runner_address
recv_message
dispatcher.clean()


In [9]:
!lsof


COMMAND    PID  TID TASKCMD   USER   FD      TYPE             DEVICE SIZE/OFF    NODE NAME
docker-in    1                root  cwd       DIR               0,51     4096 4456460 /
docker-in    1                root  rtd       DIR               0,51     4096 4456460 /
docker-in    1                root  txt       REG              253,0   765792    5077 /usr/sbin/docker-init
docker-in    1                root    0u      CHR                1,3      0t0       5 /dev/null
docker-in    1                root    1w     FIFO               0,13      0t0   17244 pipe
docker-in    1                root    2w     FIFO               0,13      0t0   17245 pipe
node         6                root  cwd       DIR               0,51     4096 4456460 /
node         6                root  rtd       DIR               0,51     4096 4456460 /
node         6                root  txt       REG               0,51 73873984 4070595 /tools/node/bin/node
node         6                root  mem       REG               