![DLI Header](../images/DLI_Header.png)

# Building Morpheus Pipelines with the Morpheus CLI

In this notebook you are going to learn how to build a very basic Morpheus pipeline using the `morpheus` command-line interface (CLI).

## Objectives

By the end of this notebook you will:

- Know the 3 different kinds of Morpheus pipelines and their high-level differences.
- Know the most basic constraints for building a pipeline.
- Know how to use the `--help` option to get more information from Morpheus.
- Be able to build an "identity" pipeline.
- Be able to customize pipeline output fields.
- Be able to use `--DEBUG` output to better understand pipeline stage inputs and outputs.
- Be able to add performance monitoring.

---

## Basic Pipeline Structure

When using the Morpheus CLI to construct pipelines, all pipelines follow this basic structure:

- Call `morpheus run`.
- Specify the type of pipeline to run: either `pipeline-fil`, `pipeline-ae`, or `pipeline-nlp`.
- Specify a source object for the pipeline, typically either a [Kafka](https://kafka.apache.org/) topic, or files from disk.
- Create a sequential list of stages along with their options. The output for one stage becomes the input for the next.
- Specify the pipeline's output, typically either a Kafka topic, or files.

---

## Calling `morpheus run` in a Jupyter Environment

If you are unfamiliar with the Jupyter environment, you need to know that a notebook (like the one you are viewing) consists of two kinds of cells. The first kind of cell, where this text is written, consists of [Markdown](https://daringfireball.net/projects/markdown/) and allows us to display text and images.

The second kind of cell is an executable code cell. Code cells always have brackets (`[ ]`) on their left. While the code cell is being executed an asterisk will appear inside the brackets `[*]` and after the cell is complete running, a number will appear inside the brackets to indicate its count in the number of executions that have taken place within the environment, for example `[1]`.

There are several ways to execute a code cell. Perhaps the easiest is to either press `SHIFT + RETURN` which will execute the cell and move your cursor to the next cell, or, `CTRL + RETURN` which will execute the cell and leave your focus on it. You can also use your mouse to click the ▶️ icon in the menu near the top of the notebook pane.

Please note that Jupyter code cells execute Python code by default. We can, however, prepend an exclamation point `!` to the cell to instead issue a command line command. Execute the following cell, which will issue the `morpheus run` command from a command line, resulting in the printing of the help message for `morpheus run`:

In [1]:
!morpheus run

Usage: morpheus run [OPTIONS] COMMAND [ARGS]...

Options:
  --num_threads INTEGER RANGE     Number of internal pipeline threads to use
                                  [default: 4; x>=1]
  --pipeline_batch_size INTEGER RANGE
                                  Internal batch size for the pipeline. Can be
                                  much larger than the model batch size. Also
                                  used for Kafka consumers  [default: 256;
                                  x>=1]
  --model_max_batch_size INTEGER RANGE
                                  Max batch size to use for the model
                                  [default: 8; x>=1]
  --edge_buffer_size INTEGER RANGE
                                  The size of buffered channels to use between
                                  nodes in a pipeline. Larger values reduce
                                  backpressure at the cost of memory. Smaller
                                  values will push messages through the


## Calling `morpheus run` in a Jupyter Terminal

For use cases where the output is minimal, using a code cell prefixed with `!` to issue command line commands makes a lot of sense. Many times however, we would like to issue command line commands that are long running, have a lot of output, or require interactivity. In these cases it makes more sense to use an actual terminal to issue commands.

There are several ways to open a terminal in Jupyter. For now, use Jupyter's _File_ menu, click _New Launcher_ and from the launcher screen that appears, click the _Terminal_ button. Doing so will open a new tab in the Jupyter environment running a terminal with access to the same file system that this notebook is running in. You can navigate between terminals and notebooks at any time.

Open a Jupyter terminal now, and from the command line prompt that appears, issue the `morpheus run` command.

---

## Three Kinds of Pipelines

At present, Morpheus ships with the ability to run three different kinds of pipelines. Each pipeline is centered around the kind of inference that it can perform on data in the pipeline, and provides additional configurations to that end.

We will be looking at each of these pipeline types in detail during the workshop, but for now, here is a very high level summary of them each.

### Forest Inference Library (FIL) Pipeline

`pipeline-fil` can perform inference on data using the [RAPIDS Forest Inference Library](https://medium.com/rapids-ai/rapids-forest-inference-library-prediction-at-100-million-rows-per-second-19558890bc35), or FIL. FIL provides accelerated inference for tree-based models, including gradient-boosted decision tree models (like those from XGBoost and LightGBM) and random forests. Using `pipeline-fil` we can construct pipelines that perform lightning-fast classification inference using tree-based machine learning models we provide.

In this workshop you will use `pipeline-fil` to classify whether or not users are using system compute resources in malicious ways.

### Natural Language Processing (NLP) Pipeline

`pipeline-nlp` can perform inference on data using Natural Language Processing (NLP) models such as those based on the [BERT](https://huggingface.co/docs/transformers/model_doc/bert) architecture.

NLP can be a powerful tool in the cybersecurity toolkit, as many cybersecurity problems can be viewed as natural language processing problems. The ubiquitous use of regex for log parsing in cybersecurity workflows is a sign that analysts need linguistic information about the data they oversee. NLP models can generalize their understanding of data and therefore identify data in need of action, even when the specific characters of the data have yet to be seen by analysts.

In this workshop you will use `pipeline-nlp` to detect the presence of sensitive information in PCAP logs.

### Auto-Encoder (AE) Pipeline

`pipeline-ae` can perform two kinds of unsupervised inference on data: anomaly detection through the use of [autoencoders](https://en.wikipedia.org/wiki/Autoencoder), and time series anomaly detection through the use of [Fast Fourier Transforms](https://en.wikipedia.org/wiki/Fast_Fourier_transform) (FFT).

In this workshop you will use `pipeline-ae` to create "digital fingerprints" of system users and services and to identify when users or services are behaving unlike their digital fingerprint and potentially therefore compromised and acting under the agency of attackers.

---

## Using `--help` to Explore Pipeline Commands

As you begin to learn how to build Morpheus pipelines with the CLI, you will often want information about the available options to you. A great way to quickly get this information is to use the `--help` flag after a CLI command or subcommand.

We are going to use this method to help us construct our first very simple Morpheus pipeline.

### Basic Pipeline Rules

As an exercise in preparation for our first pipeline, execute the following command to see the help text for `pipeline-fil`. Spend a few minutes reading the output and be prepared to discuss the following questions:

- What are the 4 rules all FIL pipelines must follow?
- What kind of data sources are currently supported?
- What does the `deserialize` stage do? Where in a pipeline must it be used?
- Does the FIL pipeline have to perform inference on the data passing through it?

In [2]:
!morpheus run pipeline-fil --help

Usage: morpheus run pipeline-fil [OPTIONS] COMMAND1 [ARGS]... [COMMAND2
                                 [ARGS]...]...

  Configure and run the pipeline. To configure the pipeline, list the stages
  in the order that data should flow. The output of each stage will become the
  input for the next stage. For example, to read, classify and write to a
  file, the following stages could be used

  pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model
  --server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json

  Pipelines must follow a few rules:
  1. Data must originate in a source stage. Current options are `from-file` or `from-kafka`
  2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline
  3. Only one inference stage can be used. Zero is also fine
  4. The following stages must come after an inference stage: `add-class`, `filter`, `gen-viz`

Options:
  --model_fea_le

---

## Identity Pipeline

Let's try to create the most basic pipeline possible, an identity pipeline that simply reads data from a file and writes it back without changes to another file.

### Data Source Stage

The help text above provides us with some helpful information to get started. First:
```
1. Data must originate in a source stage. Current options are `from-file` or `from-kafka`
```

If we look at the help text for the `from-file` stage we see that we can provide it a `--filename` flag with an input filename:

In [3]:
!morpheus run pipeline-fil from-file --help

[32mConfiguring Pipeline via CLI[0m
Usage: morpheus run pipeline-fil from-file [OPTIONS]

Options:
  --filename FILE              Input filename
  --iterative                  Iterative mode will emit dataframes one at a
                               time. Otherwise a list of dataframes is
                               emitted. Iterative mode is good for
                               interleaving source stages.
  --file-type [auto|csv|json]  Indicates what type of file to read. Specifying
                               'auto' will determine the file type from the
                               extension.  [default: auto]
  --repeat INTEGER RANGE       Repeats the input dataset multiple times.
                               Useful to extend small datasets for debugging.
                               [default: 1; x>=1]
  --filter_null BOOLEAN        Whether or not to filter rows with null 'data'
                               column. Null values in the 'data' column can
           

For this exercise we have provided the file `nvsmi.jsonlines`. We will be looking in more detail at this data later in the workshop, but for now just now that it is available to us in the current directory:

In [5]:
!ls

Building-a-Pipeline.ipynb  data  images  nvsmi.jsonlines


Therefore we might begin forming our pipeline as such:

```sh
morpheus run pipeline-fil \
  from-file --filename nvsmi.jsonlines
```

### Deserialize Data

The help message above also indicated:
```
2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline
```

Therefore we might extend our pipeline as such:

```sh
morpheus run pipeline-fil \
  from-file --filename nvsmi.jsonlines
  deserialize
```

### Write to File

Since our goal is simply to write the data back to file unchanged we can use the `to-file` stage which also takes a `--filename` flag:

In [6]:
!morpheus run pipeline-fil to-file --help

[32mConfiguring Pipeline via CLI[0m
Usage: morpheus run pipeline-fil to-file [OPTIONS]

Options:
  --filename PATH  The file to write to  [required]
  --overwrite      Whether or not to overwrite the target file
  --help           Show this message and exit.


Therefore we will extend our pipeline as such:

```sh
morpheus run pipeline-fil \
  from-file --filename nvsmi.jsonlines
  deserialize
  to-file --filename output.jsonlines
```

### Execute the Pipeline

Let's execute the pipeline as we've constructed it and see what happens:

In [7]:
!morpheus run pipeline-fil \
  from-file --filename nvsmi.jsonlines \
  deserialize \
  to-file --filename output.jsonlines --overwrite

[32mConfiguring Pipeline via CLI[0m
[31mStarting pipeline via CLI... Ctrl+C to Quit[0m
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/bin/morpheus", line 11, in <module>
    sys.exit(run_cli())
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/cli.py", line 1395, in run_cli
    cli(obj={}, auto_envvar_prefix='MORPHEUS', show_default=True, prog_name="morpheus")
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/con

It looks like we got an error:
```
RuntimeError: The to-file stage cannot handle input of <class 'morpheus.pipeline.messages.MultiMessage'>. Accepted input types: (<class 'morpheus.pipeline.messages.MessageMeta'>,)
```

In summary this error states that the data entering the `to-file` stage is not of the correct type. Recall that in Morpheus pipelines, the output of one stage is the input to the next, so it would seem that the output of the `deserialize` stage is not of the type that the `to-file` stage expects.

### Serialize Data

What we need to fix this particular error is to add a `serialize` stage:

In [8]:
!morpheus run pipeline-fil --help | grep ' serialize'

  serialize     Include & exclude columns from messages


This makes our simple identity pipeline the following:

In [9]:
!morpheus run pipeline-fil \
  from-file --filename nvsmi.jsonlines \
  deserialize \
  serialize \
  to-file --filename output.jsonlines

[32mConfiguring Pipeline via CLI[0m
[31mStarting pipeline via CLI... Ctrl+C to Quit[0m


We can now see that a new `output.jsonlines` file has been created:

In [10]:
!ls

Building-a-Pipeline.ipynb  data  images  nvsmi.jsonlines  output.jsonlines


### Compare Input and Output

Let's compare the input and output data since our intention was to write to file the same data we read. We will start by reading the JSON lines files into dataframes.

In [11]:
import pandas as pd

In [12]:
source = pd.read_json('nvsmi.jsonlines', lines=True)

In [13]:
output = pd.read_json('output.jsonlines', lines=True)

Next we compare their shapes:

In [14]:
source.shape

(1242, 175)

In [15]:
output.shape

(1242, 175)

Next let's compare the dataframes directly. `df.compare(df2)` will return any differences between 2 dataframes:

In [16]:
source.compare(output)

Unnamed: 0_level_0,timestamp,timestamp
Unnamed: 0_level_1,self,other
0,2021-03-12 09:46:00.956650240,2021-03-12 09:46:00.956650496
5,2021-03-12 09:48:32.237897984,2021-03-12 09:48:32.237897472
7,2021-03-12 09:49:34.100829952,2021-03-12 09:49:34.100829696
9,2021-03-12 09:50:35.820171776,2021-03-12 09:50:35.820171520
10,2021-03-12 09:51:06.891563008,2021-03-12 09:51:06.891562752
...,...,...
1227,2021-03-12 20:27:18.598337792,2021-03-12 20:27:18.598337536
1228,2021-03-12 20:27:49.341308928,2021-03-12 20:27:49.341308672
1231,2021-03-12 20:29:20.879171840,2021-03-12 20:29:20.879171584
1239,2021-03-12 20:33:26.666036736,2021-03-12 20:33:26.666036480


It would appear that the timestamp fields have changed slightly during the process of serialization and deserialization. Let's round these time values to the nearest millisecond and then compare again:

In [17]:
source['timestamp'] = source['timestamp'].dt.round('ms')

In [18]:
output['timestamp'] = output['timestamp'].dt.round('ms')

In [19]:
source.compare(output)

No output indicates the dataframes are identical, and we can reasonably state that we have achieved creating a very basic identity pipeline.

---

## Customizing Output

We may only be interested in certain parts of our data, and wish only to write some of it to our source. The `serialize` stage provides options to customize what will be passed onto later stages of the pipeline:

In [20]:
!morpheus run pipeline-fil serialize --help

[32mConfiguring Pipeline via CLI[0m
Usage: morpheus run pipeline-fil serialize [OPTIONS]

Options:
  --include TEXT  Which columns to include from MultiMessage into JSON. Can be
                  specified multiple times. Resulting columns is the
                  intersection of all regex. Include applied before exclude
                  [default: (All Columns)]
  --exclude TEXT  Which columns to exclude from MultiMessage into JSON. Can be
                  specified multiple times. Resulting ignored columns is the
                  intersection of all regex. Include applied before exclude
                  [default: ^ID$, ^_ts_; required]
  --help          Show this message and exit.


Here we run our simple pipeline again, but choose only to write 2 data fields to file:

In [21]:
!morpheus run pipeline-fil \
  from-file --filename nvsmi.jsonlines \
  deserialize \
  serialize --include 'nvidia_smi_log.gpu.processes.process_info.1.process_name' --include 'nvidia_smi_log.gpu.product_name' \
  to-file --filename small_output.jsonlines --overwrite

[32mConfiguring Pipeline via CLI[0m
[31mStarting pipeline via CLI... Ctrl+C to Quit[0m


In [22]:
# Show the first entry in the output data, which only contains the 2 data fields we specified above.
!cat small_output.jsonlines | jq -s '.[0]'

[1;39m{
  [0m[34;1m"nvidia_smi_log.gpu.processes.process_info.1.process_name"[0m[1;39m: [0m[0;32m"tritonserver"[0m[1;39m,
  [0m[34;1m"nvidia_smi_log.gpu.product_name"[0m[1;39m: [0m[0;32m"Tesla V100-SXM2-16GB"[0m[1;39m
[1;39m}[0m


---

## Log Level Debug

Often when constructing pipelines it can be helpful to get more information about what the pipeline is doing. We can accomplish this by setting the Morpheus `--log_level` to `DEBUG`:

In [23]:
!morpheus --help | grep 'log_level'



We will do just that for our simple identity pipeline:

In [24]:
!morpheus --log_level=DEBUG run pipeline-fil \
  from-file --filename nvsmi.jsonlines \
  deserialize \
  serialize \
  to-file --filename output.jsonlines --overwrite

[32mConfiguring Pipeline via CLI[0m
[2mLoaded columns. Current columns: [['nvidia_smi_log.gpu.pci.tx_util', 'nvidia_smi_log.gpu.pci.rx_util', 'nvidia_smi_log.gpu.fb_memory_usage.used', 'nvidia_smi_log.gpu.fb_memory_usage.free', 'nvidia_smi_log.gpu.bar1_memory_usage.total', 'nvidia_smi_log.gpu.bar1_memory_usage.used', 'nvidia_smi_log.gpu.bar1_memory_usage.free', 'nvidia_smi_log.gpu.utilization.gpu_util', 'nvidia_smi_log.gpu.utilization.memory_util', 'nvidia_smi_log.gpu.temperature.gpu_temp', 'nvidia_smi_log.gpu.temperature.gpu_temp_max_threshold', 'nvidia_smi_log.gpu.temperature.gpu_temp_slow_threshold', 'nvidia_smi_log.gpu.temperature.gpu_temp_max_gpu_threshold', 'nvidia_smi_log.gpu.temperature.memory_temp', 'nvidia_smi_log.gpu.temperature.gpu_temp_max_mem_threshold', 'nvidia_smi_log.gpu.power_readings.power_draw', 'nvidia_smi_log.gpu.clocks.graphics_clock', 'nvidia_smi_log.gpu.clocks.sm_clock', 'nvidia_smi_log.gpu.clocks.mem_clock', 'nvidia_smi_log.gpu.clocks.video_clock', 'nvidia_

This output gives a lot of helpful information, including many of the default values we may not have explicitly set, for example:
```sh
Config: 
{
  "ae": null,
  "class_labels": [
    "mining"
  ],
  "debug": false,
  "edge_buffer_size": 128,
  "feature_length": 29,
  "fil": {
    "feature_columns": []
  },
  "log_config_file": null,
  "log_level": 10,
  "mode": "FIL",
  "model_max_batch_size": 8,
  "num_threads": 4,
  "pipeline_batch_size": 256
}
```

...the input and output types for each stage, for example:
```sh
└─ morpheus.MultiMessage -> morpheus.MessageMeta
```

...and also the actual pipeline function calls from the Morpheus source code, in case you need or want to [go digging deeper](https://github.com/nv-morpheus/Morpheus/blob/64564e1eb051f6820b13772074736e09bdc8941f/morpheus/stages/preprocess/deserialize_stage.py#L33), for example:
```sh
DeserializeStage()
```

---

## Performance Monitoring

Especially when you begin to construct complicated pipelines to work on massive amounts data, you may very well wish to information about the performance of your pipeline. This can be done by adding `monitor` stages:

In [25]:
!morpheus run pipeline-fil --help | grep ' monitor'

  monitor       Display throughput numbers at a specific point in the pipeline


In [26]:
!morpheus run pipeline-fil monitor --help

[32mConfiguring Pipeline via CLI[0m
Usage: morpheus run pipeline-fil monitor [OPTIONS]

Options:
  --description TEXT  Header message to use for this monitor  [required]
  --smoothing FLOAT   How much to average throughput numbers. 0=full average,
                      1=instantaneous  [default: 0.05]
  --unit TEXT         Units to use for data rate
  --delayed_start     When delayed_start is enabled, the progress bar will not
                      be shown until the first message is received. Otherwise,
                      the progress bar is shown on pipeline startup and will
                      begin timing immediately. In large pipelines, this
                      option may be desired to give a more accurate timing.
  --help              Show this message and exit.


As shown above, `monitor` stages will report throughput numbers at a specific point in the pipeline. Here we rewrite our simple identity pipeline to print throughput information about the `deserialize` stage.

Because the output of this command is going to update as throughput changes over the life of the pipeline, we would do better to run it in an actual terminal, and not a code cell where output is static. To do this, copy the following cell and paste it into an open Jupyter terminal (remember you can open one by doing _File -> New Launcher -> Terminal_).

```sh
cd /dli/task/01-Building-a-Pipeline

morpheus --log_level=DEBUG run pipeline-fil \
  from-file --filename nvsmi.jsonlines \
  deserialize \
  monitor --description "Deserialize Rate" --unit msg \
  serialize \
  to-file --filename output.jsonlines --overwrite
```

You will have noticed the addition of throughput numbers to the output, for example:
```
Deserialize Rate[Complete]: 1242msg [00:00, 5057.80msg/s]
```

Given the simplicity of our current pipeline, this isn't yet of much use, but as your pipelines grow more sophisticated, you will likely wish to include `monitor` stages in a variety of locations in your pipelines.

---

## Summary

In this notebook you learned how to construct very basic Morpheus pipelines, including how to use `--help` and `DEBUG` to get more information, and how to monitor throughput with the `monitor` stage. You learned at a very high level about the 3 different kind of pipelines Morpheus offered, and also how to work with the Morpheus CLI in this interactive Jupyter environment.

One of the main goals of this workshop is for you to feel comfortable constructing Morpheus pipelines, and to that effect, in the next section, you are going to use what you have learned so far to construct your first Morpheus pipeline from scratch.

## Next

Each separate notebook in this JupyterLab environment runs its own Jupyter "kernel" which may utilize GPU memory and other system resources. In order to avoid unforeseen issues between separate notebooks sharing GPU resources, you should shut down each notebook's kernel before proceeding to the next notebook.

This can be done by right-clicking the notebook name in the left-hand side file viewer and selecting "Shut Down Kernel" as shown here:

![shut down kernel](images/stop_kernel.png)

After shutting down the kernel, please continue to the next notebook.