### turn on autoreloading

In [1]:
%load_ext autoreload
%autoreload 2

### install ray library
uncomment this to run it once on your machine, then recomment it

In [2]:
#!pip install -U "ray[default]"

### start ray server

you only need to run this once per jupyter session.

In [3]:
!ray start --head --port=6379 --redis-password="cbgt2"  #command line 

[38;5;153mLocal node IP[39m: [1m10.162.14.77[22m
2021-06-21 12:05:01,700	INFO services.py:1272 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m

[38;5;77m--------------------[39m
[38;5;77mRay runtime started.[39m
[38;5;77m--------------------[39m

[38;5;75mNext steps[39m
  To connect to this Ray runtime from another node, run
  [1m  ray start --address='10.162.14.77:6379' --redis-password='cbgt2'[22m
  
  Alternatively, use the following Python code:
    [38;5;204mimport[39m[26m ray
    ray[38;5;204m.[39m[26minit(address[38;5;204m=[39m[26m[38;5;222m'auto'[39m[26m, _redis_password[38;5;204m=[39m[26m[38;5;222m'cbgt2'[39m[26m)
  
  [4mIf connection fails, check your firewall settings and network configuration.[24m
  
  To terminate the Ray runtime, run
  [1m  ray stop[22m
[0m

In [16]:
!ray start --address='10.162.14.77:6379' --redis-password='cbgt2' #command line 

[38;5;153mLocal node IP[39m: [1m10.162.14.77[22m
[2021-06-21 13:25:38,728 C 12356 163288] service_based_gcs_client.cc:248: Couldn't reconnect to GCS server. The last attempted GCS server address was 10.162.14.77:52068
*** StackTrace Information ***
    @        0x10c044900  ray::SpdLogMessage::Flush()
    @        0x10c01bb29  ray::RayLog::~RayLog()
    @        0x10bcdc5ac  ray::gcs::ServiceBasedGcsClient::ReconnectGcsServer()
    @        0x10bc83aff  _ZZN3ray3rpc12GcsRpcClient14GetAllNodeInfoERKNS0_21GetAllNodeInfoRequestERKNSt3__18functionIFvRKNS_6StatusERKNS0_19GetAllNodeInfoReplyEEEEENKUlS9_SC_E_clES9_SC_
    @        0x10bc8346b  ray::rpc::ClientCallImpl<>::OnReplyReceived()
    @        0x10bb42ac6  _ZNSt3__110__function6__funcIZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_NS_9allocatorIS5_EEFvvEEclEv
    @        0x10bfda8a8  boost::asio::detail::completion_handler<>::do_complete()
    @        0x10c119ce3  boost::asio::detail::scheduler::do_run_one()

### install autopep8

In [None]:
# !pip install --upgrade autopep8

### how to run autopep8

uncomment and run this on any code files you create before pushing to github

In [None]:
# !autopep8 --in-place --aggressive --aggressive <filename>.py

### import the core cbgt code and any of your files

In [2]:
import cbgt as cbgt
# add import statement here for any files you make

2021-06-22 09:48:02,259	INFO worker.py:726 -- Connecting to existing Ray cluster at address: 10.162.14.77:6379


### create an empty pipeline

In [3]:
pl = cbgt.Pipeline()

### basic variable assignment

Use `pipeline.variablename` to write to a specific variable.

Current two kinds of basic assignment are permitted:

- assigning a "constant" (which can be a whole python expression but it's constant in that the value is calculated immediately and not during pipeline execution)
- copying a value from another pipeline variable

More is probably on the way

In [4]:
pl.hello = "Hello World!"
pl.world = pl.hello

In [5]:
# these are special objects, not ordinary object properties
pl.hello

<backend.VariablePlaceholder at 0x7fa110ad7d90>

In [6]:
# two steps so far
pl.modulelist

[<backend.BasicAssignmentModule at 0x7fa10004f910>,
 <backend.BasicCopyModule at 0x7fa10004f880>]

### "code block" modules

A "code block" can be created in the form of a function that takes "self" as its only argument. (You can technically use any variable name.)

Python doesn't support true code blocks unlike some other languages.

Use `pipeline.add(yourfunction)` to add a code block.

In [7]:
def helper1(number):
    # this function is defined in a "normal way" outside the context of the pipeline.
    # this function be basically anything, it's just to demonstrate that outside functions
    # can be used in code block modules
    return (number+1, number+2)

In [8]:
def codeblock(self):
    # you can do pretty much anything you want in here
    self.y, self.z = helper1(self.x)
    self.x += 1234567
    

In [9]:
pl.add(codeblock)

<backend.Pipeline at 0x7fa10004f160>

In [10]:
pl.modulelist

[<backend.BasicAssignmentModule at 0x7fa10004f910>,
 <backend.BasicCopyModule at 0x7fa10004f880>,
 <backend.CodeTaskFunctionModule at 0x7fa111eb8400>]

### function modules

Functions can be transformed into modules by specifying their inputs and outputs.  There is a special syntax for doing this.

`pipeline.output = pipeline[function](arguments=whatever)`

If a function has multiple outputs (a tuple) then use the `.shape()` function to change the output into the needed shape. This function takes a length (or list of lengths for nested tuples) as input. 

In [11]:
def helper2(first,second):
    # function with 1 output and 2 inputs
    return "".join([first,second])

In [12]:
# regularly this is what you'd do
# joined = helper2(hello, second="!!!!")

In [13]:
# arguments can be passed either by position or name, just like when you call the function normally
pl.joined = pl[helper2](pl.hello, second="!!!!")

In [14]:
pl.modulelist

[<backend.BasicAssignmentModule at 0x7fa10004f910>,
 <backend.BasicCopyModule at 0x7fa10004f880>,
 <backend.CodeTaskFunctionModule at 0x7fa111eb8400>,
 <backend.FunctionModule at 0x7fa111eb8160>]

In [15]:
# you can pass in external values no problem

pl.a,pl.b = pl[helper1](10000).shape(2)

In [16]:
pl.modulelist

[<backend.BasicAssignmentModule at 0x7fa10004f910>,
 <backend.BasicCopyModule at 0x7fa10004f880>,
 <backend.CodeTaskFunctionModule at 0x7fa111eb8400>,
 <backend.FunctionModule at 0x7fa111eb8160>,
 <backend.FunctionModule at 0x7fa111eb8eb0>]

### pipeline composition

You can build up a pipeline and then `.add(...)` it to another pipeline.

You can use multiple `.add()` in one line.

In [17]:
def countingblock(self):
    try:
        self.counter += 1
    except:
        self.counter = 1

In [18]:
anotherpipeline = cbgt.Pipeline().add(countingblock).add(countingblock).add(countingblock)

In [19]:
anotherpipeline.modulelist

[<backend.CodeTaskFunctionModule at 0x7fa111eb8a00>,
 <backend.CodeTaskFunctionModule at 0x7fa111eb8e50>,
 <backend.CodeTaskFunctionModule at 0x7fa111eb8910>]

In [20]:
pl.add(anotherpipeline)

<backend.Pipeline at 0x7fa10004f160>

In [21]:
pl.modulelist

[<backend.BasicAssignmentModule at 0x7fa10004f910>,
 <backend.BasicCopyModule at 0x7fa10004f880>,
 <backend.CodeTaskFunctionModule at 0x7fa111eb8400>,
 <backend.FunctionModule at 0x7fa111eb8160>,
 <backend.FunctionModule at 0x7fa111eb8eb0>,
 <backend.PipelineModule at 0x7fa111eb8970>]

### running a pipeline

1. You can optionally create a dictionary of variable values as the initial state of the pipeline

2. You then use `executionmanager.run(pipeline,...)` which returns a new dictionary... the results

In [22]:
environment = {
    'x': 100,
}

In [23]:
results = cbgt.ExecutionManager(cores=7).run(pl,environment)

In [24]:
results

{'x': 1234667,
 'hello': 'Hello World!',
 'world': 'Hello World!',
 'y': 101,
 'z': 102,
 'joined': 'Hello World!!!!!',
 'a': 10001,
 'b': 10002,
 'counter': 3}

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/ray/autoscaler/_private/monitor.py", line 317, in run
    self._run()
  File "/opt/anaconda3/lib/python3.8/site-packages/ray/autoscaler/_private/monitor.py", line 207, in _run
    self.update_load_metrics()
  File "/opt/anaconda3/lib/python3.8/site-packages/ray/autoscaler/_private/monitor.py", line 169, in update_load_metrics
    response = self.gcs_node_resources_stub.GetAllResourceUsage(
  File "/opt/anaconda3/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/anaconda3/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "{"created":"@1624352628.101928000","descri

### examples of importing other code snippets

In [None]:
from phaseAsegment1 import *

In [None]:
popdata

In [None]:
from phaseAsegment2 import *

In [None]:
t1_epochs

# new code

In [None]:
pl = cbgt.Pipeline()

# testing

In [None]:
# fill in with whatever you need as the starting variables
environment = {
}

In [None]:
results = cbgt.ExecutionManager(cores=7).run(pl,environment)
results

In [None]:
# compare results to desired values 