# Jupyter operator

In this notebook you can find some basic examples on how to use the operator's predefined functions to interact with incoming data and to output data. 
You can also check the documentation for all examples at the [end of this notebook](#Predefined-Functions-Documentation).

Tip: You can access the documentation of a function at any time inside a Jupyter notebook by pressing `Shift-Tab`on top of the function name (inside a code cell) for a short description or `Shift-Tab-Tab` for a complete description.

# Writing data to an output port

You can send data to an output port by calling the function `api.send`.

By executing the below code the operator will write the `data` to the output port `out`. If the port is connected to a terminal, you could see the data on its interface.

In [18]:
data='test'
api.send('out', "data")

Note that the type of output data should be compatible with the output port type, in this case, the type `string`.

# Dealing with input data

Data processing is done with callbacks due to the Modeler's asynchronous nature. For instance, you can set a callback 
to be called when new data is received in the registered port. See below some examples of how to do that.

To try out the use of callbacks, execute the cell below to define a simple function that prints the received data to the cell output and then writes the data to the output port `out`.

In [None]:
import io
import pandas as pd
def on_data_in(value):
    print(value)
    df = pd.read_csv(value, sep=';')
    api.send('out', str(df.shape))
api.try_port_callback('input1', on_data_in)

## Trying out a callback
The function `api.try_port_callback` executes a callback once with the data from the specified port. Note that this function is available only in interactive mode.

By executing the cell below one input data will be consumed from the input port `in` and fed to the `on_data_in` function. You should see the data received printed to the cell output and the operator should write the data to the output port `out`. 

Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/home/jupyter/.ipython/profile_default/startup/jupyter_api.py", line 258, in _perform_try_port_callback
    raise e
  File "/home/jupyter/.ipython/profile_default/startup/jupyter_api.py", line 253, in _perform_try_port_callback
    raise Exception(
Exception: Timeout: No data available at the input                             queue of port input1
Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/home/jupyter/.ipython/profile_default/startup/jupyter_api.py", line 258, in _perform_try_port_ca

In [7]:
# def on_input(data):
#     # to send metrics to the Submit Metrics operator, create a Python dictionary of key-value pairs
#     df = pd.read_csv(io.StringIO(data), sep=';')
    
    
# api.set_port_callback("input1", on_input)


In [8]:
!pip install hdfs
!pip install openpyxl
!pip install pandas
# !pip install hdfs [dataframe, kerberos]


Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/local/bin/python3.9 -m pip install --upgrade pip' command.[0m


If an exception happens during the execution of `api.try_port_callback`, the exception is thrown in the notebook kernel and the graph state won't be affected. The exception and the stack trace are printed to the last active cell output.

In [3]:
# from hdfs import InsecureClient
# client = InsecureClient('http://datalake:50070')
# import pandas as pd
# client.status("/")
# def on_input(data):
#     # to send metrics to the Submit Metrics operator, create a Python dictionary of key-value pairs
#     df = pd.read_csv(data)
#     print(df)
    
# api.set_port_callback("input1", on_input)



In [1]:
from datetime import date,timedelta
from dateutil.relativedelta import relativedelta
def get_last18_months_date():
    dates=[]
    current_date = date.today()
    for d in range(0,18):
        month_date = current_date- relativedelta(months=d)
        month=str(month_date.year)+"-"+str(month_date.month)
        dates.append(month)
    return dates

In [2]:
import pandas as pd
def filter_file(file1):
    dates=get_last18_months_date()
    file1['transdatemonth'] = file1['transdatemonth'].astype("string")
    file1['tran_years'] = file1['tran_years'].astype("string")
    file1["month_year"]=file1["tran_years"]+"-"+file1["transdatemonth"]
    df=file1.loc[file1["month_year"].isin(dates)]
    df=df.drop("month_year",axis=1)
#     print("scdsdc",df)
    return df

In [3]:
from hdfs import InsecureClient
client = InsecureClient('http://datalake:50070')

client.status("/")
import os
import io

def on_input(data):
    with client.read(data, encoding='utf-8') as reader:    
        file1 = pd.read_csv(reader)
    df=filter_file(file1)
    out=os.path.dirname(os.path.abspath(data))
    out=os.path.join(out,"result.csv")
    client.write(out,df,overwrite=True,encoding = 'utf-8')
api.set_port_callback("in", on_input)

<class 'encodings.utf_8.StreamReader'> <encodings.utf_8.StreamReader object at 0x7fbd73fa2d00>


In [12]:
"""from hdfs import InsecureClient
client = InsecureClient('http://datalake:50070')

client.status("/")
import os
import io

def on_input(data):
    with client.read(data, encoding='utf-8') as reader:    
        file1 = pd.read_csv(reader)
    df=filter_file(file1)
    out=os.path.dirname(os.path.abspath(data))
    out=os.path.join(out,"result.csv")
    client.write(out,df,overwrite=True,encoding = 'utf-8')
api.set_port_callback("in", on_input)"""


from hdfs import InsecureClient
client = InsecureClient('http://datalake:50070')

client.status("/")
import os
import pandas as pd
data="/shared/ZDEMO/past_date.csv"
with client.read(data, encoding='utf-8') as reader:
    file1 = pd.read_csv(reader)
    print(type(reader),reader)
    df=filter_file(file1)
    out=os.path.dirname(os.path.abspath(data))
    # out=os.path.join(out,"result1.csv")
    df.to_csv("result12.csv",sep = ',' , encoding='utf-8', index=False , header = True)
    client.upload(out, "result12.csv")
# writer=client.write(out,str(df.columns),overwrite=True,encoding = 'utf-8')
# with client.write(out, encoding = 'utf-8', overwrite=True) as writer:
#     df.to_csv(writer, sep=",", index=False, encoding = 'utf-8', header=False)
# with client.write(out) as writer:
#     print(type(writer),writer)
    
#     writer.write(df)
# df.to_csv(out)
# with client.write(out,overwrite=True) as writer: 
#     df.to_csv(writer)
#     df.to_csv(writer,index = False, sep=',', encoding='utf-8') 

print(df.shape)
print(file1.shape)

<class 'encodings.utf_8.StreamReader'> <encodings.utf_8.StreamReader object at 0x7f5d4cbf4340>


TypeError: an integer is required (got type bytes)

In [None]:
import pandas as pd
import datetime
from io import StringIO
from hdfs import InsecureClient , HdfsError
client = InsecureClient('http://datalake:50070') 
def read_file(data):
    attr = dict() #convert String data input to a CSV Readable format 
    data = StringIO(data) 
    df = pd.read_csv(data, sep=",") 
    df.fillna('' , inplace = True) 
    df = df[df.CURRENCY != ''] 
    sdl_path = '/shared/ZDEMO/' fname = '/data_refined.csv' 
    df.to_csv(fname, sep = ',' , encoding='utf-8', index=False , header = True) 
    try: 
        client.upload(sdl_path, fname) 
        res_out = 'File uploaded to SDL with file name' + fname 
        api.send("out2",res_out ) 
    except HdfsError as er :
        api.send("out2", 'Error in uploading file to SDL') 
        api.send("out2", str(df)) 

api.set_port_callback("input1", read_file)

In [None]:
# out=None
# import io
# data="/shared/ZDEMO/past_date.csv"
# with client.read(data, encoding='utf-8') as reader:
#     file1 = pd.read_csv(reader)
# df=filter_file(file1)
# out=os.path.dirname(os.path.abspath(data))
# out=os.path.join(out,"result123.csv")
# print("out",out)
# try:
# #     with client.upload(out , 'result12.csv', n_threads=1, temp_dir=None) as writer : 
# #         df.to_csv(writer) 
# #         with client.write(out,overwrite=True,encoding = 'utf-8') as writer:
# #             df.to_csv(writer)
    

#     client.write(out,file1,overwrite=True,encoding = 'utf-8')

            
# except Exception as ex:
#     print("ex",ex)
# print("out",out)
#     try:
#         with client.upload(out , 'result12.csv', n_threads=1, temp_dir=None) as writer : 
#             df.to_csv(writer) 
# #         with client.write(out ,encoding = 'utf-8') as writer:
# #             df.to_csv(writer)
#     except Exception as ex:
#         print("ex",ex)

None


## Reading data indefinitely with api.set_port_callback
Callbacks can be registered by using the function `api.set_port_callback`. As a result, the function is called when new data is received in the specified port(s).

By executing the code below, the `on_data_in` callback will be registered for the `in` input port. Thus, `on_data_in` will be executed for new data available.

In [13]:
api.set_port_callback('in', on_data_in)

If an exception happens during the execution of the callback registered with the `api.set_port_callback` function, the exception is logged into the Python subengine and the graph fails.

## Unregistering a callback
To unregister a callback so that it is no longer executed, you can use the function `api.remove_port_callback` with the function as a parameter.

In [14]:
api.remove_port_callback(on_data_in)

## Registering callbacks to multiple ports

A callback can be registered to multiple ports. In this case, the callback is executed when data is available in all specified ports.

In [15]:
def on_multiple_inputs(input1, input2):
    pass

# assuming that there are two input ports: 'in1', 'in2'
api.set_port_callback(['in1', 'in2'], on_multiple_inputs)

ErrorNonexistentPort: Input port in1 does not exist

# Running in Productive mode

Jupyter operators can run in 'productive' mode so that no user interaction is required to run the cells. This mode can be selected in the operator settings in the Modeler UI, toggling the configuration parameter 'Productive' from `False` to `True`.

When in productive mode, cells tagged as productive are executed with no user interaction. In the Jupyter notebook UI, you can find a tab on top of each cell. To tag a cell, you can type the desired tag in the tab's textbox, in this case, `productive`, and click on 'Add tag' button.

In [None]:
def my_productive_code(value):
    api.send('out', value)
    
api.set_port_callback('in', my_productive_code)

Notes:
- If the productive code references some other code in a cell that was not marked as productive with the tag `productive`, a runtime error happens because the operator ignores non-productive code in this mode, making the graph fail. Therefore you should mark all relevant code cells as `productive`.
- When running in productive mode, it is not possible to access the Jupyter notebook UI to interact with the cells.
- It is not possible to change the execution mode at runtime. To alter it, you must stop the graph, set the 'Productive' configuration to the desired value, and then start the graph again.


# Accessing the configuration object

You can get the configuration object set in the operator configuration's UI in the Modeler by calling `api.get_config`. For example, to get the location of the current notebook, you can access the "notebookPath" key from the config object.

In [None]:
config = api.get_config()
config["notebookFilePath"]

# Installing Python modules
Python modules can be installed by using the api.add_dependency function.

In the example below, we use `api.add_dependency` to install the `h5py` module.

In [None]:
api.add_dependency('h5py')

# Working with messages

You can access the Message type by calling `Message`. The body and attributes of a message object `msg` can be accessed as `msg.body` and `msg.attributes`, respectively.

#### Message(body, attributes)    
    Args:
        *body (object): body of the message
        attributes(dict(str, object) | None): attributes of the message

In [None]:
new_message = Message(None, {"debug": True, "config": {}})
print(new_message.body)
print(new_message.attributes)

# Predefined Functions Documentation

- [api.send(port, data)](#api.send(port,-data))
- [api.get_config()](#api.get_config())
- [api.add_dependency(package_name)](#api.add_dependency(package_name)
)
- [api.try_port_callback(ports, callback)](#api.try_port_callback(ports,-callback))
- [api.set_port_callback(ports, callback)](#api.set_port_callback(ports,-callback))
- [api.remove_port_callback(callback)](#api.remove_port_callback(callback))

#### api.send(port, data)

    Writes the data to the specified operator output port. Be careful with the correspondence between the Python data object and the Modeler port type.
    Args:
        port (str): operator's output port name
        data: data object to send

#### api.get_config()

    Returns the Jupyter operator's configuration object
    Args:
        None
    Returns:
        config(dict): configuration object

#### api.add_dependency(package_name)

    This method tries to install a Python package using pip. If the installation fails, an exception is raised.

    Args:
        package_name (str): Name of the package to be installed with pip.

#### api.try_port_callback(ports, callback)

    This method executes the callback once with the data retrieved from the specified input port. The callback is called only when there are messages available in all input ports.
    * Note that, this function is available only in interactive mode.
    
    Args:
        ports (str|list[str]): input ports to be tested with the callback. `ports` can be a list of strings with the name of each port to be associated, or a string if you want to associate the callback with a single port.
        callback (func[...]): a callback function with the same number of arguments as elements in `ports`. The arguments are passed to `callback` in the same order as their corresponding ports in the `ports` argument. 

#### api.set_port_callback(ports, callback)
    This method associates the input ports to the callback. The callback is called only when there are messages available in all input ports. If this method is called multiple times for the same group of ports, then the previous callback is overwritten by the provided one.
    Different ports group cannot overlap. For example, a port can be only associated with one callback at a time.
    
    Args:
        ports (str|list[str]): input ports to be associated with the callback. `ports` can be a list of strings with the name of each port to be associated, or a string if you want to associate the callback with a single port.
        callback (func[...]): a callback function with the same number of arguments as elements in `ports` or a variable-length argument. Also the arguments are passed to `callback` in the same order of their corresponding ports in the `ports` argument.   

#### api.remove_port_callback(callback)
    Unregister the callback function. If the function is not registered, the method exits quietly.

    Args:
        callback (func[...]): callback function to be removed.