# core

> Main(/all) routines for `ipyfernel` 

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
#| hide
from jupyter_client.manager import KernelManager
from jupyter_client.kernelspec import KernelSpecManager
import subprocess 
from IPython.display import display, Image
import base64
from pathlib import Path
from IPython.core.magic import register_line_magic, register_line_cell_magic
import json

# Setup Routines

The remote kernel needs to be speified by the user. The SSH config will typically be called by other routines since the port is usually changing each time.

In [None]:
#| export
def register_remote_kernel(
    remote_python="/path/to/python",   # Full path of Python executable to run on remote system.
    kernel_name="ipyf_remote_kernel",  # Any old name will do. This is fine.
    display_name="Remote Python",      # This is just what you'll see when you look at a list.
    ssh_host_alias="remote_server_sshpyk", # Same alias as was used in writing to ssh config file.
    remote_kernel_name="python3",      # Typical for Jupiter.
    language="python",                 # Probably want to leave this unless you want to try R.
    verbose=True                       # Print extra info.
    ):
    "registers which python kernel will be used on remote machine"
    ksm = KernelSpecManager()
    registered_names = list(ksm.get_all_specs().keys())
    if kernel_name in registered_names: 
        if verbose: print(f"{kernel_name} is already a registered kernel") 
    else: 
        if verbose: print(f"{kernel_name} is not a registered kernel. We need to add it") 
        subprocess.run(["sshpyk", "add", "--kernel-name", kernel_name,
            "--display-name", display_name, "--remote-python", remote_python, "--ssh-host-alias", ssh_host_alias,
            "--remote-kernel-name", remote_kernel_name, "--language", language
        ])
        if verbose: print("Success.")


In [None]:
register_remote_kernel(remote_python='/Users/shawley/exercises/solveit/.venv/bin/python')

ipyf_remote_kernel is already a registered kernel


In [None]:
#| export
def set_ssh_config(
    port:int,                       # Port number on proxy server (e.g. bore.pub)
    user:str="",                    # Username on remote system.
    alias="remote_server_sshpyk",   # Default alias for `sshpyk`, leave it alone
    proxyname="bore.pub",           # Have tested this with bore
    config_path="~/.ssh/config",    # Shouldn't need to change this.
    ):
    "(called by ipf_startup) sets up user's ssh .config file with info used later"
    config_path = Path(config_path).expanduser()
    if not config_path.exists(): config_path.touch()
    text = config_path.read_text()
    if f"Host {alias}" not in text: 
        assert user != "", "Must specify username when creating ~/.ssh/config info"
        block = f"""
Host {alias}
    HostName {proxyname}
    Port {port}
    User {user}
    BatchMode yes
    ControlMaster auto
    ControlPath ~/.ssh/sshpyk_%r@%h_%p
    ControlPersist 10m
    StrictHostKeyChecking no
    UserKnownHostsFile /dev/null
"""
        config_path.write_text(text + block)
    else:
        lines = text.splitlines()
        in_target_block = False
        for i, line in enumerate(lines):
            if line.startswith("Host "):
                in_target_block = (line == f"Host {alias}")
            elif in_target_block and line.strip().startswith("Port "):
                lines[i] = f"    Port {port}"
            elif proxyname and in_target_block and line.strip().startswith("HostName "):
                lines[i] = f"    HostName {proxyname}"
            elif user and in_target_block and line.strip().startswith("User "):
                lines[i] = f"    User {user}"
        config_path.write_text("\n".join(lines) + "\n")
    print(f'{config_path} file updated.') 

In [None]:
# Demo that:
# On remote machine run:  bore local 22 --to bore.pub
# Take that port and put it here
port = 3365
set_ssh_config(port) 

/app/data/.ssh/config file updated.


# Lower-Level Routines

These back-end routines typically don't need to be called directly by the user; rather they're called by  the user interface routines.

In [None]:
#| export
_ipf_km, _ipf_kc = None, None            # "ipf" = "ipyfernel" ;-) 
def ipf_startup(kernel_name="ipyf_remote_kernel"):  
    "Start up the remote kernel"
    global _ipf_km, _ipf_kc 
    if _ipf_km is None and _ipf_kc is None: #only do this at startup
        # Create kernels paremt folder if it doesn't exist (like in solveit)
        Path("~/.local/share/jupyter/runtime").expanduser().mkdir(parents=True, exist_ok=True)
        _ipf_km = KernelManager(kernel_name=kernel_name)
        _ipf_km.start_kernel()
        _ipf_kc = _ipf_km.client()
        _ipf_kc.start_channels()
        _ipf_kc.wait_for_ready(timeout=30)
        print("Success: remote kernel started")
    else: 
        print("ipf_startup: already running")

In [None]:
ipf_startup()

Success: remote kernel started


In [None]:
#| export
def _output_hook(
    msg,   #  Message obtained from remote execution
    ):
    "How to handle output from the remote kernel."
    mt = msg["msg_type"]
    content = msg.get("content", {})
    if mt == "stream":
        print(content["text"], end="", flush=True)
    elif mt == "error":
        print('\n'.join(content.get("traceback", [])))
    elif mt in ("display_data", "update_display_data"):
        data = content.get("data", {})
        if "image/png" in data:
            display(Image(base64.b64decode(data["image/png"])))
        elif "text/plain" in data:
            print(data["text/plain"])

In [None]:
#| export
def ipf_exec(
    code:str,           # Code to be executed
    verbose=False,      # Return details about remote execution.
    ):
    "Execute code on the remote kernel." 
    assert _ipf_kc is not None, "ipf_exec: need to run ipf_startup() first"
    result = _ipf_kc.execute_interactive(code=code, output_hook=_output_hook)
    _ipf_kc.last_result = result  # stash it for optional inspection later
    if verbose: return result

In [None]:
code = """
import platform 
print(platform.system())
"""
ipf_exec(code)

Darwin


In [None]:
#| export
def ipf_shutdown(verbose=True):
    "Terminates the remote kernel"
    global _ipf_km, _ipf_kc
    if verbose: print("Shutting down remote kernel") # Note: Could make say if remote kernel is not even running.
    try:
        if _ipf_kc is not None: _ipf_kc.stop_channels()
        if _ipf_km is not None: _ipf_km.shutdown_kernel(now=True)  # 'now=True' forces immediate shutdown
    except: pass  # Don't hang on errors
    _ipf_km, _ipf_kc = None, None

In [None]:
ipf_shutdown()

Shutting down remote kernel


In [None]:
#| export
def _setup_tunnels(remote_ports, ssh_host="remote_server_sshpyk"):
    """Create SSH tunnels for each port"""
    tunnels = []
    for local_port, remote_port in remote_ports.items():
        cmd = ['ssh', '-N', '-L', f'{local_port}:127.0.0.1:{remote_port}', ssh_host]
        proc = subprocess.Popen(cmd)
        tunnels.append(proc)
    return tunnels

In [None]:
#| export
def _prepare_kaggle_connection(ngrok_host, ngrok_port, ssh_alias="remote_server_sshpyk"):
    """Prepare connection to existing Kaggle kernel"""
    set_ssh_config(port=ngrok_port, user='root', alias=ssh_alias, proxyname=ngrok_host)
    result = subprocess.run(['ssh', ssh_alias, 'cat /root/.local/share/jupyter/runtime/kernel-*.json'], 
                       capture_output=True, text=True)
    if result.returncode != 0:
        raise RuntimeError(f"Failed to fetch kernel info: {result.stderr}")
    kernel_dict = json.loads(result.stdout)
    remote_ports = {v:v for k,v in kernel_dict.items() if '_port' in k}
    _setup_tunnels(remote_ports=remote_ports)
    local_conn = kernel_dict.copy()
    local_conn['ip'] = '127.0.0.1'  # Connect locally through tunnels
    conn_file = Path.home() / '.local/share/jupyter/runtime/kaggle_kernel.json'
    conn_file.parent.mkdir(parents=True, exist_ok=True)
    conn_file.write_text(json.dumps(local_conn, indent=2))
    print('Successfully created connection file and forwarded ports!')
    return conn_file

In [None]:
#| export
def connect_existing_kernel(tunnel, kernel_name="ipyf_remote_kernel"):  
    "Connect to existing remote kernel"
    global _ipf_km, _ipf_kc 
    if _ipf_km is None and _ipf_kc is None: #only do this at startup
        try:
            host, port = tunnel.split(':')
            conn_file = _prepare_kaggle_connection(host, int(port))
        except ValueError:
            raise ValueError(f"Failed to extract host and port from tunnel (expects 'host:port' e.g. '2.tcp.ngrok.io:13103'): {tunnel}")
        _ipf_km = KernelManager(connection_file=str(conn_file))
        _ipf_km.load_connection_file()
        _ipf_kc = _ipf_km.client()
        _ipf_kc.start_channels()
        _ipf_kc.wait_for_ready(timeout=10)
        print(f"Success: connected to remote kernel via {tunnel}")
    else: 
        print("connect_existing_kernel: already connected")
    

# User Interface Routines

These are the main routines you'll typically use: 

1. Start and stop remote the remote kernel
2. iPython magics `%%remote` and `%%local`
3. (optional) "Sticky" mode, to redirect cell execution. 

## Start and Stop Remote Kernel 

In [None]:
#| export
def start_remote(port, user=""):
    "Configure ssh connection to remote server and start remote server"
    set_ssh_config(port, user=user) 
    try: 
        ipf_startup()
    except Exception as e: 
        print(f"Error starting up remote kernel: {e}") 
        return 

In [None]:
start_remote(port)

/app/data/.ssh/config file updated.


Success: remote kernel started


In [None]:
#| export
def stop_remote():
    "shutdown remote server"
    unset_sticky()  # get rid of any input transformers (see below) 
    ipf_shutdown()

In [None]:
#| echo: false
# these were the old names of the routines
import warnings

def set_remote(port, user=""):
    "Deprecated: use start_remote instead"
    warnings.warn("set_remote is deprecated, use start_remote instead", DeprecationWarning)
    start_remote(port, user=user)

def unset_remote():
    "Deprecated: use stop_remote instead"
    warnings.warn("unset_remote is deprecated, use stop_remote instead", DeprecationWarning)
    stop_remote()

## iPython Magics

These line or cell magics, invoked via '%' or '%%' respectively, will direct execution of a line or cell to occur on the appropriate system -- "remote" or "local", where local means in the current notebook. 

In [None]:
#| export
_skip_next = False  # This is used in conjunction with %%local, below

def _execute_remotely(lines:list[str]):
    "Take commands from magics and send to ipf_exec"
    global _skip_next
    if _skip_next:
        _skip_next = False
        return lines
    code = ''.join(lines)
    if 'get_ipython()' in code: return lines  # let solveit internals pass through
    # Make sure our controls execute locally
    if code.strip().startswith(('%local', '%%local', 'start_remote(', 'stop_remote(', 'set_remote(', 'unset_remote(', 'set_sticky(','unset_sticky(')):
        return lines
    return [f"ipf_exec({repr(code)})\n"]

In [None]:
#| export
@register_line_cell_magic
def remote(line, cell=None):
    "remote exeuction: works as %remote and as %%remote" 
    ipf_exec(cell if cell else line)

In [None]:
%%remote 
#%%remote    <-- docs are filtering magics but that's what's used here
import socket 
hostname = socket.gethostname()   # let's make sure we're running remotely
print("Hello from",hostname) 

Hello from Chonk


^ "Chonk" is the name of my home laptop 

In [None]:
#| export
@register_line_cell_magic
def local(line, cell=None):
    "local execution: works as %local and as %%local"
    global _skip_next
    _skip_next = True
    get_ipython().run_cell(cell if cell else line) 

In [None]:
%%local 
#%%local <-- docs are filtering magics but that's what's used here
import socket 
hostname = socket.gethostname()   # let's make sure we're running remotely
print("Hello from",hostname) 

Hello from 549d6fca895f


^"549d6fca895f" happens to be the name of the solvent instance currently running.

## 'Sticky'/'Seamless' Remote Excution 

via Input Transformers.  These can make cells set execute remotely by default.

**WARNINGS**: 
1. Solve it is not intended to work with people modifying input transformers So be wary. Nevertheless, this seems to work.
2. If they're commands that you definitely want to execute locally, maybe run `%unset_sticky` just to be sure first.


In [None]:
#| export
gip = get_ipython()

def set_sticky():
    "Makes code cells execute remotely, via input transformer"
    assert _ipf_kc is not None, "Need an active remote kernel connection" 
    for f in gip.input_transformers_cleanup[:]:   # gaurd against appending twice
        if getattr(f, '__name__', '') == '_execute_remotely':
            print("Already executing remotely") 
            return 
    gip.input_transformers_cleanup.append(_execute_remotely)
    print('Code cells will now execute remotely.')

In [None]:
#| export
def unset_sticky():
    "Un-sticks remote execution for code cells" 
    for f in gip.input_transformers_cleanup[:]:  
        if getattr(f, '__name__', '') == '_execute_remotely':
            gip.input_transformers_cleanup.remove(f)
    print("Code cells will now run locally.") 



In [None]:
stop_remote()

print()
start_remote(port)
set_sticky()


Code cells will now run locally.
Shutting down remote kernel



/app/data/.ssh/config file updated.


Success: remote kernel started
Code cells will now execute remotely.


In [None]:
#| eval: false
# remote execution
import socket 
hostname = socket.gethostname()
print("Hello from",hostname) 

Hello from Chonk


In [None]:
unset_sticky()

Code cells will now run locally.


In [None]:
#| eval: false
# local execution
import socket 
hostname = socket.gethostname()
print("Hello from",hostname) 

Hello from 549d6fca895f


In [None]:
#| hide

#import nbdev; nbdev.nbdev_export()
!nbdev_export --procs scrub_magics