# Copy data from multiple sources and load it to multiple targets using IBM Watson Data APIs

## Introduction
Use the IBM Watson Data Flows service to create and run data flows in a runtime engine. A flow can read data from a large variety of sources, process that data using predefined operations or custom code, and then write the data to one or more targets. The runtime engine can handle large amounts of data so it's ideally suited for reading, processing, and writing data at volume.

The sources and targets that are supported include both Cloud and on-premises offerings as well as data assets in projects. Cloud offerings include IBM Cloud Object Storage, Amazon S3, and Microsoft Azure SQL Database, among others. On-premises offerings include IBM Db2, Microsoft SQL Server, and Oracle, among others.

In the [Create and run a data flow with Watson Data APIs](https://dataplatform.ibm.com/exchange/public/entry/view/76deece1cf6f6f4bc3789678be94616d) notebook, you learned how to construct a single source data flow with Watson Data APIs. In this notebook, you'll build on that knowledge to create a data flow with multiple source and target connections.

For a list of the supported connection types and their properties, see [IBM Watson Data Flows Service - Data Asset and Connection Properties](https://api.dataplatform.ibm.com/v2/data_flows/doc/dataasset_and_connection_properties.html).

#### Language and Spark Versions
Python 3.5<br>
Spark 2.1

#### Prerequisites
The data flow created in this tutorial copies data from two tables stored in one [Db2 Warehouse](https://console.bluemix.net/catalog/services/db2-warehouse) instance:
* GOSALES\PRODUCTS
* GOSALES\ORDER_DETAILS

You can add Db2 Warehouse to your account from [Data Services](https://dataplatform.ibm.com/data/services?target=data-services&context=data) by selecting "Add Service" and then clicking "Add" on the "Db2 Warehouse" tile.

Once it's added, you can create a connection to Db2 Warehouse from the Data Services page. Simply select "Create Connection" from the action menu for Db2 Warehouse. The required connection details will be automatially populated for you.

## Table of contents

1. [Setup](#setup)<br>
    1.1 [Environments](#setup1)<br>
    1.2 [Project Token](#setup2)<br>
    1.3 [Authorization](#setup3)<br>
2. [Creating a multi source and target data flow](#create)<br>
    2.1 [Retrieving a data asset](#create1)<br>
    2.2 [Defining sources in a data flow](#create2)<br>
    2.3 [Defining an operation in a data flow](#create3)<br>
    2.4 [Defining targets in a data flow](#create4)<br>
    2.5 [Creating the data flow](#create5)<br>
    2.6 [Run the data flow](#run1)<br>
    2.7 [Retrieve data flow run metrics](#run2)<br>
4. [Resources](#resources)

## 1. Setup

In [22]:
import requests
import json
import uuid

def pretty_print(json_content):
    parsed_json = json.loads(json_content)
    print(json.dumps(parsed_json, indent=4, sort_keys=True))

####  <a id="setup1"></a>1.1 Environments
The data flows service is currently available in the US South and UK regions of IBM Cloud. Use either environment URL in place of {service_URL} in the examples below:

    US south https://api.dataplatform.ibm.com
    UK https://api.eu-gb.dataplatform.ibm.com


In [23]:
service_URL = "https://api.dataplatform.ibm.com"

####  <a id="setup2"></a>1.2 Project Token
Insert a project token from the action bar (more > Insert project token). Project tokens are used to access project resources like data sources and connections.

In [24]:
# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.


In [25]:
project_id = pc.projectID

####  <a id="setup3"></a>1.3 Authorization
An IAM Bearer token is required in order to access IBM Watson Data APIs. For information on how to generate an IAM token see <a href="http://ibm.biz/wdp-api#getting" target="_blank" rel="noopener noreferrer">here</a>.

In [26]:
# Replace <IAM Access Token> with your generated IAM Access Token
authorization = "Bearer <IAM Access Token>"

##  <a id="create"></a>2. Creating a data flow with multiple sources and targets ##
The following example shows how to create a data flow that copies data from multiple source connections and loads data to multiple target connections.

####  <a id="create1"></a>2.1 Retrieving a connection ####
Begin by retrieving a list of all connections in the Watson Studio project and then choosing one or more connections to use as the sources for the data flow. In this example, only one connection is required because we're copying data from two tables in the same database.

For further information on the connections service, see the Getting started / Connections section of the <a href="https://developer.ibm.com/api/view/watsondata-prod:watson-data:title-Watson_Data_API#doc" target="_blank" rel="noopener noreferrer">IBM Watson Data API</a> documentation.

In [27]:
# GET https://api.dataplatform.ibm.com/v2/connections?project_id=<project_id>

request = requests.get(service_URL + "/v2/connections?project_id=" + project_id + "&limit=100", headers={'Authorization': authorization})
pretty_print(request.text)

{
    "first": {
        "href": "https://api.dataplatform.ibm.com/v2/connections?project_id=e5c41f96-85a0-4172-9232-b305e6240edb&limit=100"
    },
    "resources": [
        {
            "entity": {
                "datasource_type": "cfdcb449-1204-44ba-baa6-9a8a878e6aa7",
                "description": "IBM Db2 warehouse database on Cloud",
                "flags": [],
                "interaction_properties": {
                    "source": [
                        {
                            "description": "The name of the schema that contains the table to read from",
                            "hidden": false,
                            "label": "Schema Name",
                            "masked": false,
                            "multiline": false,
                            "name": "schema_name",
                            "readonly": false,
                            "required": false,
                            "tags": [],
                            "type": "strin

In the response you can see that a connection exists with the ID `633b9935-4066-43ff-9412-efd21d441f81`. This is our connection. You'll need to use this later in the data flow you create.

####  <a id="create2"></a>2.2 Defining multiple sources in a data flow ####
A data flow can contain one or more data sources. A data source is defined as a *binding node* in the data flow *pipeline*, which has one output and no inputs. The *binding node* must reference a data asset or connection. Depending on the type, additional *properties* might also need to be specified. Refer to [IBM Watson Data Flows Service - Data Asset and Connection Properties](https://api.dataplatform.ibm.com/v2/data_flows/doc/dataasset_and_connection_properties.html) to determine which properties are applicable for a given connection, and which of those are required. 

For the following example, reference the connection you created earlier. The *binding nodes* for the data flow's sources are:

In [28]:
source_binding_node_1 = {
    "output": {
        "id": "source1Output"
      },
      "connection": {
        "ref": "633b9935-4066-43ff-9412-efd21d441f81",
        "properties": {
          "schema_name": "GOSALES",
          "table_name": "PRODUCT"
        }
      },
      "id": "source1",
      "type": "binding"
}
source_binding_node_2 = {
    "output": {
        "id": "source2Output"
      },
      "connection": {
        "ref": "633b9935-4066-43ff-9412-efd21d441f81",
        "properties": {
          "schema_name": "GOSALES",
          "table_name": "ORDER_DETAILS"
        }
      },
      "id": "source2",
      "type": "binding"
}

The `output` attribute declares the ID of the *output port* of these sources as `source1Output` & `source2Output` so that other nodes can read from them. You can see the connection with ID `633b9935-4066-43ff-9412-efd21d441f81` is being referenced in both cases because we're using the same connection for both source tables. This ID would be different for each source if we were using connections to two distinct sources.

####  <a id="create3"></a>2.3 Defining an operation in a multi-source data flow ####
A data flow can contain zero or more operations, with a typical operation having one or more inputs and one or more outputs. An operation input is linked to the output of a source or another operation. An operation can also have additional parameters which define how the operation performs its work. An operation is defined as an *execution node* in the data flow *pipeline*. 

The following example creates a sort operation for the column `PRODUCT_NUMBER` from `source1` only. A separate operation could also be added to `source2` or an operation could be applied that required both sources (for example, Join). The *execution node* for our sort operation is:

In [29]:
sort_operation = {  
  "id":"operation1",
  "type":"execution_node",
  "op":"com.ibm.wdp.transformer.FreeformCode",
  "parameters":{  
     "FREEFORM_CODE": "arrange(`PRODUCT_NUMBER`)"
  },
  "inputs":[  
     {  
        "id":"inputPort1",
        "links":[  
           {  
              "node_id_ref":"source1",
              "port_id_ref":"source1Output"
           }
        ]
     }
  ],
  "outputs":[  
     {  
        "id":"outputPort1"
     }
  ]
}

The `inputs` attribute declares an *input port* with ID `inputPort1` which references the *output port* of the source node (node ID `source1` and port ID `source1Output`). 

For this example, the operation is defined as a freeform operation, denoted by the `op` attribute value of `com.ibm.wdp.transformer.FreeformCode`. A freeform operation has only a single parameter named `FREEFORM_CODE` whose value is a snippet of Sparklyr code. In this snippet of code, a sort function is called and its argument is the column name to sort on, `PRODUCT_NUMBER`.

The `outputs` attribute declares the ID of the output of this operation as `outputPort1` so that other nodes can read from it.

####  <a id="create4"></a>2.4 Defining multiple targets in a data flow ####
A data flow can contain zero or more targets. A target is defined as a *binding node* in the data flow *pipeline* which has one input and no outputs. As with the source, the *binding node* must reference either a connection or a data asset. When using a connection as a target, specify the connection ID and the required properties for your connection type.

In the following example, we're writing to a different schema in the same connection that we created earlier. Connections are referenced by IDs. The *binding nodes* for the data flow's targets are:

In [30]:
target_binding_node_1 = {
  "input": {
    "link": {
      "node_id_ref": "operation1",
      "port_id_ref": "outputPort1"
    },
    "id": "targetInput1"
  },
  "connection": {
    "ref": "633b9935-4066-43ff-9412-efd21d441f81",
    "properties": {
      "write_mode": "insert",
      "schema_name": "DASH103285",
      "table_name": "PRODUCT_SORTED",
      "table_action": "truncate"
    }
  },
  "id": "target1",
  "type": "binding"
}
target_binding_node_2 = {
  "input": {
    "link": {
      "node_id_ref": "source2",
      "port_id_ref": "source2Output"
    },
    "id": "targetInput2"
  },
  "connection": {
    "ref": "633b9935-4066-43ff-9412-efd21d441f81",
    "properties": {
      "write_mode": "insert",
      "schema_name": "DASH103285",
      "table_name": "ORDER_DETAILS_LATEST",
      "table_action": "truncate"
    }
  },
  "id": "target2",
  "type": "binding"
}

The `input` attribute in each *binding_node* declares an *input port* with ID. `target1Input` references the *output port* of the operation node (node ID `operation1` and port ID `outputPort1`), while `target2input` references the *source output* of `source2`, because there was no operation applied to that source. The ID of the connection is used in both *binding_nodes*, along with the required properties for the connection type. Here, we're writing to the same connection as the source, but the data flow can target any connection that exists in the project.

####  <a id="create5"></a>2.5 Creating the data flow ####
Putting it all together, you can now call the API to create the data flow with the following POST method:

```POST https://{service_URL}/v2/data_flows```

The new data flow can be stored in a catalog or project. Use either the `catalog_id` **or** `project_id` query parameter, depending on where you want to store the data flow. An example request to create a data flow is shown below.

In [31]:
dataflow = {  
   "name":"my_dataflow_" + str(uuid.uuid4()),
   "pipeline":{  
      "doc_type":"pipeline",
      "version":"1.0",
      "primary_pipeline":"pipeline1",
      "pipelines":[  
         {  
            "id":"pipeline1",
            "runtime":"Spark",
            "nodes":[  
            ]
         }
      ]
   }
}

dataflow["pipeline"]["pipelines"][0]["nodes"].append(source_binding_node_1)
dataflow["pipeline"]["pipelines"][0]["nodes"].append(source_binding_node_2)
dataflow["pipeline"]["pipelines"][0]["nodes"].append(sort_operation)
dataflow["pipeline"]["pipelines"][0]["nodes"].append(target_binding_node_1)
dataflow["pipeline"]["pipelines"][0]["nodes"].append(target_binding_node_2)

dataflow_response = requests.post(service_URL + "/v2/data_flows?project_id=" + project_id, headers={'Authorization': authorization}, json=dataflow)
data_flow_id = json.loads(dataflow_response.text)["metadata"]["asset_id"]
pretty_print(dataflow_response.text)

{
    "entity": {
        "name": "my_dataflow_5497f887-eea3-464a-98b8-9af95a45351f",
        "pipeline": {
            "doc_type": "pipeline",
            "id": "4835bf16-5c40-43d6-a5fa-478a1bd69f39",
            "pipelines": [
                {
                    "id": "pipeline1",
                    "nodes": [
                        {
                            "connection": {
                                "properties": {
                                    "schema_name": "GOSALES",
                                    "table_name": "PRODUCT"
                                },
                                "ref": "633b9935-4066-43ff-9412-efd21d441f81"
                            },
                            "id": "source1",
                            "output": {
                                "id": "source1Output"
                            },
                            "type": "binding"
                        },
                        {
                            "c

The response shows that the data flow was created with an ID of `cdfb87f2-91f8-4c6e-9886-d43d992362ee`, which you will need later to run the data flow.

#### <a id="run1"></a>2.6 Run the data flow ####
To run a data flow, call the following POST API:

```
POST https://{service_URL}/v2/data_flows/{data_flow_id}/runs?project_id={project_id}
```

The value of `data_flow_id` is the `metadata.asset_id` from your data flow. An example response from this API call could be:

In [32]:
dataflow_run_response = requests.post(service_URL + "/v2/data_flows/" + data_flow_id + "/runs?project_id=" + project_id, headers={'Authorization': authorization}, json={})
data_flow_run_id = json.loads(dataflow_run_response.text)["metadata"]["asset_id"]
pretty_print(dataflow_run_response.text)

{
    "entity": {
        "configuration": {},
        "data_flow_ref": "0d4c8d74-7388-4a67-9e04-d04afd24ae77",
        "name": "my_dataflow_5497f887-eea3-464a-98b8-9af95a45351f",
        "rov": {
            "members": [],
            "mode": 0
        },
        "state": "starting",
        "tags": []
    },
    "metadata": {
        "asset_id": "10a7260c-1387-4893-9461-1245aad8494d",
        "asset_type": "data_flow_run",
        "create_time": "2018-04-16T13:43:15.000Z",
        "creator": "********",
        "creator_id": "ibmid-060000kubs",
        "href": "https://api.dataplatform.ibm.com/v2/data_flows/0d4c8d74-7388-4a67-9e04-d04afd24ae77/runs/10a7260c-1387-4893-9461-1245aad8494d?project_id=e5c41f96-85a0-4172-9232-b305e6240edb",
        "project_id": "e5c41f96-85a0-4172-9232-b305e6240edb",
        "usage": {
            "access_count": 0,
            "last_access_time": "2018-04-16T13:43:15.415Z",
            "last_accessor": "********",
            "last_accessor_id": "ibmid-06

#### <a id="run2"></a>2.7 Retrieve data flow run metrics####

To retrieve a breakdown of the latest data flow metrics broken down by source and target, call the following GET method:

`GET https://{service_URL}/v2/data_flows/{data_flow_id}/runs/{data_flow_run_id}/metrics?project_id={project_id}`

The value of data_flow_id is the metadata.asset_id from your data flow. The value of data_flow_run_id is the metadata.asset_id from your data flow run. 

In [33]:
dataflow_run_metrics = requests.get(service_URL + "/v2/data_flows/" + data_flow_id + "/runs/" + data_flow_run_id + "/metrics?project_id=" + project_id, headers={'Authorization': authorization})
pretty_print(dataflow_run_metrics.text)

{
    "details": [
        {
            "connection_type": "cfdcb449-1204-44ba-baa6-9a8a878e6aa7",
            "disposition": "source",
            "metrics": {
                "bytes_read": 30524,
                "fields": [],
                "fields_read": 9,
                "rows_read": 548
            },
            "node_ref": "source1",
            "pipeline_ref": "pipeline1",
            "port_ref": "source1Output",
            "type": "binding"
        },
        {
            "connection_type": "cfdcb449-1204-44ba-baa6-9a8a878e6aa7",
            "disposition": "source",
            "metrics": {
                "bytes_read": 61997197,
                "fields": [],
                "fields_read": 9,
                "rows_read": 446023
            },
            "node_ref": "source2",
            "pipeline_ref": "pipeline1",
            "port_ref": "source2Output",
            "type": "binding"
        },
        {
            "connection_type": "cfdcb449-1204-44ba-baa6-9a8a878e6

## <a id="resources"></a>3. Resources ##
For further information, see <a href="http://ibm.biz/wdp-api" target="_blank" rel="noopener noreferrer">IBM Watson Data API</a>

## Author
**Wesley Williams** & **Damian Cummins**, Cloud Application Developers with the Data Refinery and IBM Watson teams at IBM. 

Copyright © IBM Corp. 2018. This notebook and its source code are released under the terms of the MIT License.

<div style="background:#F5F7FA; height:110px; padding: 2em; font-size:14px;">
<span style="font-size:18px;color:#152935;">Love this notebook? </span>
<span style="font-size:15px;color:#152935;float:right;margin-right:40px;">Don't have an account yet?</span><br>
<span style="color:#5A6872;">Share it with your colleagues and help them discover the power of Watson Studio!</span>
<span style="border: 1px solid #3d70b2;padding:8px;float:right;margin-right:40px; color:#3d70b2;"><a href="https://ibm.co/wsnotebooks" target="_blank" style="color: #3d70b2;text-decoration: none;">Sign Up</a></span><br>
</div>