In [19]:
#hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [20]:
#default_exp dataflow

In [21]:
#export

import time

from nifi_api.rest import Flowfiles, Processor

# Dataflow

>   Monitors and controls a Nifi Dataflow.

In [22]:
#export


class DataFlow:
    """
    Monitors and controls a Nifi dataflow. The dataflow starts
    when the **run** method is called.

    Parameters
   -------------

      dataFlowIds: DataFlowIds
        data structure that contains all the IDs of the in/out
        processors and connections

"""

    def __init__(
        self,
        dataflow_ids: object,
        delay_seconds_after_start: int = 14,
        delay_seconds_between_checks: int = 15,
    ) -> None:
        self.in_processor = Processor(dataflow_ids.in_processor)
        self.in_flowfiles = Flowfiles(dataflow_ids.in_connection)
        self.middle_processor = Processor(dataflow_ids.middle_processor)
        self.out_processor = Processor(dataflow_ids.out_processor)
        self.out_flowfiles = Flowfiles(dataflow_ids.out_connection)

        self.seconds_after_start = delay_seconds_after_start
        self.seconds_between_checks = delay_seconds_between_checks

    def run(self) -> None:

        print('pipeline watching has started..')

        self.out_processor.update_run_status("STOPPED")
        self.in_processor.update_run_status("RUNNING")
        time.sleep(self.seconds_after_start)
        self.in_flowfiles.get_ids()
        self.middle_processor.update_run_status("RUNNING")
        self.in_processor.update_run_status("STOPPED")

        while True:

            self.out_flowfiles.get_ids()

            if self.in_flowfiles.equals(self.out_flowfiles):

                self.middle_processor.update_run_status("STOPPED")
                self.out_processor.update_run_status("RUNNING")
                print("Pipeline watching has finished ...")
                break
            time.sleep(self.seconds_between_checks)

In [23]:
# Test
# Uses the group processor *Test API* in the Cloudera session.

# 1. Turn on the  "Initial" and "Middle" processors, turn off the
#    "Body" and "Final" processors.

# 2. Generate the data structure with the connections and processors Ids

from nifi_api.environment import DataFlowIds
ids = {
    "in_connection": {
        "Id": "cc549c6e-0177-1000-ffff-ffffb5d2aba2",
        "name": "First"
    },
    "out_connection": {
        "Id": "51ab3b24-084f-1309-0000-00001946f2c7",
        "name": "Final"
    },
    "in_processor": {
        "Id": "36c62ad6-d606-3b04-9743-d77b6249608c",
        "name": "First"
    },
    "middle_processor": {
        "Id": "cc54862f-0177-1000-ffff-ffffe7325a20",
        "name": "Middle"
    },
    "out_processor": {
        "Id": "51ab3b1e-084f-1309-a135-aa0100d7186b",
        "name": "Final"
    },
}
data_ids = DataFlowIds(ids)

# 4. Instantiate the DataFlow class as follows:
test_dataflow = DataFlow(
    dataflow_ids=data_ids,
    delay_seconds_after_start=10,
    delay_seconds_between_checks=10,
)
# Call the run method. The following events must happen:
#  - "First" and "Last" proccessor turn on and off, respectively.
#  - "First" processor turns off and "Middle" processor turns on
#  - "Final" turns on

test_dataflow.run()

pipeline watching has started..
Pipeline watching has finished ...


In [25]:
# Source To Raw
ids = {
            "in_connection": {
                "type": "SuccessConnection",
                "Id": "109133cb-0be0-1603-a259-369b84b4af5d",
                "description": "Set Schemas: Input Port"
            },
            "in_processor": {
                "type": "ListS3Processor",
                "Id": "61a23678-daa8-1e7d-a120-b42561af374d",
                "description": "Lists CSVs in a S3 bucket"
            },
            "middle_processor": {
                "type": "RouteOnAttributeProcessor",
                "Id": "013833bd-24f2-1445-b217-d2646ef11db9",
                "description": "Set Schemas: Filters a list of CSVs"
            },
            "out_connection": {
                "type": "SuccessConnection",
                "Id": "efe531d0-87b8-1e6b-9fe7-e5f950a477bd",
                "description": "Move CSVs ---> Log & Terminate"
            },
            "out_processor": {
                "type": "LogJSONProcessor",
                "Id": "b6223c79-7dfd-1a8c-94f8-6cc3aa5f43b4",
                "description": "Logs JSON"
            }
        }

data_ids = DataFlowIds(ids)

# 4. Instantiate the DataFlow class as follows:
test_dataflow = DataFlow(
    dataflow_ids=data_ids,
    delay_seconds_after_start=10,
    delay_seconds_between_checks=10,
)
# Call the run method. The following events must happen:
#  - "First" and "Last" proccessor turn on and off, respectively.
#  - "First" processor turns off and "Middle" processor turns on
#  - "Final" turns on

test_dataflow.run()

pipeline watching has started..
Pipeline watching has finished ...


In [14]:
# Raw to Discovery
ids2 = {
            "in_connection": {
                "type": "SuccessConnection",
                "Id": "b3c83c57-f5ec-15c8-9ae8-934d6a9b01bb",
                "description": "Update Tables: Input Port"
            },
            "in_processor": {
                "type": "GenerateFlowfile",
                "Id": "5d1e3da9-d96a-113d-a78a-56aee3a0ffed",
                "description": "Start Pipeline"
            },
            "middle_processor": {
                "type": "UpdateAttribute",
                "Id": "52333fd3-b8d2-18aa-8d60-b92ac85f8170",
                "description": "Update Tables:Start"
            },
            "out_connection": {
                "type": "SuccessConnection",
                "Id": "46ce3ce0-5f95-1ef0-a749-ab9ca320e4b9",
                "description": "Log & Terminate: Funnel --> Log JSON"
            },
            "out_processor": {
                "type": "AttributeToJSON",
                "Id": "09733dd0-fbd1-1e46-b4d1-130bab4f9414",
                "description": "Logs JSON generates JSON for S3 bucket"
            }
        }


data_ids2 = DataFlowIds(ids2)

# 4. Instantiate the DataFlow class as follows:
test_dataflow2 = DataFlow(
    dataflow_ids=data_ids2,
    delay_seconds_after_start=10,
    delay_seconds_between_checks=10,
)
# Call the run method. The following events must happen:
#  - "First" and "Last" proccessor turn on and off, respectively.
#  - "First" processor turns off and "Middle" processor turns on
#  - "Final" turns on

test_dataflow2.run()

pipeline watching has started..
Pipeline watching has finished ...


In [7]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 01_environment.ipynb.
Converted 02_rest.ipynb.
Converted 03_dataflow.ipynb.
Converted 04_source_to_refined.ipynb.
Converted 09_tools.ipynb.
Converted index.ipynb.
