diff --git a/content/integrate/redis-data-integration/ingest/data-pipelines/data-pipelines.md b/content/integrate/redis-data-integration/ingest/data-pipelines/data-pipelines.md index 4452b49d7e..c36ec9ba0b 100644 --- a/content/integrate/redis-data-integration/ingest/data-pipelines/data-pipelines.md +++ b/content/integrate/redis-data-integration/ingest/data-pipelines/data-pipelines.md @@ -12,15 +12,46 @@ RDI implements [change data capture](https://en.wikipedia.org/wiki/Change_data_capture) (CDC) with *pipelines*. (See the [architecture overview]({{< relref "/integrate/redis-data-integration/ingest/architecture#overview" >}}) -for an introduction to pipelines.) There are two basic types of pipeline: - -- *Ingest* pipelines capture data from an external source database - and add it to a Redis target database. -- *Write-behind* pipelines capture data from a Redis source database - and add it to an external target database. +for an introduction to pipelines.) ## Overview +An RDI pipeline captures change data records from the source database, and transforms them +into Redis data structures. It writes each of these new structures to a Redis target +database under its own key. + +By default, RDI transforms the source data into +[hashes]({{< relref "/develop/data-types/hashes" >}}) or +[JSON objects]({{< relref "/develop/data-types/json" >}}) for the target with a +standard data mapping and a standard format for the key. +However, you can also provide your own custom transformation [jobs](#job-files) +for each source table, using your own data mapping and key pattern. You specify these +jobs declaratively with YAML configuration files that require no coding. + +The data tranformation involves two separate stages. First, the data ingested by +[Debezium](https://debezium.io/) is automatically transformed to a JSON format. Then, +this JSON data gets passed on to your custom transformation for further processing. + +You can provide a job file for each source table you want to transform, but you +can also add a *default job* for any tables that don't have their own. +You must specify the full name of the source table in the job file (or the special +name "*" in the default job) and you +can also include filtering logic to skip data that matches a particular condition. +As part of the transformation, you can specify whether you want to store the +data in Redis as +[JSON objects]({{< relref "/develop/data-types/json" >}}), +[hashes]({{< relref "/develop/data-types/hashes" >}}), +[sets]({{< relref "/develop/data-types/sets" >}}), +[streams]({{< relref "/develop/data-types/streams" >}}), +[sorted sets]({{< relref "/develop/data-types/sorted-sets" >}}), or +[strings]({{< relref "/develop/data-types/strings" >}}). + +The diagram below shows the flow of data through the pipeline: + +{{< image filename="/images/rdi/data-transformation-pipeline.png" >}} + +## Pipeline configuration + RDI uses a set of [YAML](https://en.wikipedia.org/wiki/YAML) files to configure each pipeline. The following diagram shows the folder structure of the configuration: @@ -30,11 +61,10 @@ structure of the configuration: The main configuration for the pipeline is in the `config.yaml` file. This specifies the connection details for the source database (such as host, username, and password) and also the queries that RDI will use -to extract the required data. You can also specify one or more optional *job* configurations in the `Jobs` folder. Use these to specify custom -*data transformations* -to apply to the source data before writing it to the target. +to extract the required data. You should place job configurations in the `Jobs` +folder if you want to specify your own data transformations. -The sections below describe these two types of configuration files in more detail. +The sections below describe the two types of configuration file in more detail. ## The `config.yaml` file @@ -115,13 +145,13 @@ configuration contains the following data: supply a unique composite key. - `advanced`: These optional properties configure other Debezium-specific features. The available sub-sections are: - - `sink`: all advanced properties for writing to RDI (TLS, memory threshold, etc). + - `sink`: All advanced properties for writing to RDI (TLS, memory threshold, etc). See the Debezium [Redis stream properties](https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_redis_stream) page for the full set of available properties. - - `source`: all advanced connector properties (for example, RAC nodes). See the + - `source`: All advanced connector properties (for example, RAC nodes). See the Debezium [Connectors](https://debezium.io/documentation/reference/stable/connectors/) page for the full set of available properties. - - `quarkus`: all advanced properties for Debezium server, such as the log level. See the + - `quarkus`: All advanced properties for Debezium server, such as the log level. See the Quarkus [Configuration options](https://quarkus.io/guides/all-config) docs for the full set of available properties. @@ -174,27 +204,87 @@ output: The main sections of these files are: - `source`: This is a mandatory section that specifies the data items that you want to - use. The `table` - property refers to the table name you supplied in `config.yaml`. The default - job doesn't apply to a specific table, so use "*" in place of the table name - for this job only. + use. You can add the following properties here: + - `server_name`: Logical server name (optional). This corresponds to the `debezium.source.topic.prefix` + property specified in the Debezium Server's `application.properties` config file. + - `db`: Database name (optional) + - `schema`: Database schema (optional) + - `table`: Database table name. This refers to a table name you supplied in `config.yaml`. The default + job doesn't apply to a specific table, so use "*" in place of the table name for this job only. + - `row_format`: Format of the data to be transformed. This can take the values `data_only` (default) to + use only the payload data, or `full` to use the complete change record. See the `transform` section below + for details of the extra data you can access when you use the `full` option. + - `case_insensitive`: This applies to the `server_name`, `db`, `schema`, and `table` properties + and is set to `true` by default. Set it to `false` if you need to use case-sensitive values for these + properties. -- `transform`: This is an optional section describing the transformation - that the pipeline - applies to the data before writing it to the target. +- `transform`: This is an optional section describing the transformation that the pipeline + applies to the data before writing it to the target. The `uses` property specifies a + *transformation block* that will use the parameters supplied in the `with` section. See the + [data transformation reference]({{< relref "/integrate/redis-data-integration/ingest/reference/data-transformation" >}}) + for more details about the supported transformation blocks, and also the + [JMESPath custom functions]({{< relref "/integrate/redis-data-integration/ingest/reference/jmespath-custom-functions" >}}) reference. + {{< note >}}If you set `row_format` to `full` under the `source` settings, you can access extra data from the + change record in the transformation: + - Use the expression `key.key` to get the generated Redis key as a string. + - Use `before.` to get the value of a field *before* it was updated in the source database + (the field name by itself gives you the value *after* the update).{{< /note >}} + - `output`: This is a mandatory section to specify the data structure(s) that RDI will write to the target along with the text pattern for the key(s) that will access it. Note that you can map one record to more than one key in Redis or nest - a record as a field of a JSON structure. + a record as a field of a JSON structure (see + [Data denormalization]({{< relref "/integrate/redis-data-integration/ingest/data-pipelines/data-denormalization" >}}) + for more information about nesting). You can add the following properties in the `output` section: + - `uses`: This must have the value `redis.write` to specify writing to a Redis data + structure. You can add more than one block of this type in the same job. + - `with`: + - `connection`: Connection name as defined in `config.yaml` (by default, the connection named `target` is used). + - `data_type`: Target data structure when writing data to Redis. The supported types are `hash`, `json`, `set`, + `sorted_set`, `stream` and `string`. + - `key`: This lets you override the default key for the data structure with custom logic: + - `expression`: Expression to generate the key. + - `language`: Expression language, which must be `jmespath` or `sql`. + - `expire`: Positive integer value indicating a number of seconds for the key to expire. + If you don't specify this property, the key will never expire. + +{{< note >}}You must specify a `transform` section, or a `key` section under `output`, or both.{{< /note >}} + +Another example below shows how you can rename the `fname` field to `first_name` in the table `emp` +using the +[`rename_field`]({{< relref "/integrate/redis-data-integration/ingest/reference/data-transformation/rename_field" >}}) block. It also demonstrates how you can set the key of this record instead of relying on +the default logic. (See the +[Transformation examples]({{< relref "/integrate/redis-data-integration/ingest/data-pipelines/transform-examples" >}}) +section for more examples of job files.) + +```yaml +source: + server_name: redislabs + schema: dbo + table: emp +transform: + - uses: rename_field + with: + from_field: fname + to_field: first_name +output: + - uses: redis.write + with: + connection: target + key: + expression: concat(['emp:fname:',fname,':lname:',lname]) + language: jmespath +``` See the [RDI configuration file]({{< relref "/integrate/redis-data-integration/ingest/reference/config-yaml-reference" >}}) reference for full details about the available source, transform, and target configuration options and see also the -[Transformation examples]({{< relref "/integrate/redis-data-integration/ingest/data-pipelines/transform-examples" >}}). +[data transformation reference]({{< relref "/integrate/redis-data-integration/ingest/reference/data-transformation" >}}) +for details of all the available transformation blocks. ## Source preparation @@ -207,6 +297,47 @@ find the preparation guides for the databases that RDI supports in the [Prepare source databases]({{< relref "/integrate/redis-data-integration/ingest/data-pipelines/prepare-dbs" >}}) section. +## Deploy a pipeline + +If you are hosting RDI on your own VMs, you can use the +[`deploy`]({{< relref "/integrate/redis-data-integration/ingest/reference/cli/redis-di-deploy" >}}) +command to deploy a configuration, including the jobs, once you have created them. + +If your RDI CLI is deployed as a pod in a Kubernetes cluster, you should perform the following +steps to deploy a pipeline: + +- Create a [ConfigMap](https://kubernetes.io/docs/concepts/configuration/configmap/) from the + YAML files in your `jobs` folder: + + ```bash + kubectl create configmap redis-di-jobs --from-file=jobs/ + ``` + +- Deploy your jobs: + + ```bash + kubectl exec -it pod/redis-di-cli -- redis-di deploy + ``` + +{{< note >}}When you create or modify a ConfigMap, it will be available in the `redis-di-cli` pod +after a short delay. Wait around 30 seconds before running the `redis-di deploy` command.{{< /note >}} + +You have two options to update the ConfigMap: + +- For smaller changes, you can edit the ConfigMap directly with this command: + + ```bash + kubectl edit configmap redis-di-jobs + ``` + +- For bigger changes, such as adding another job file, edit the files in your local `jobs` folder and then run this command: + + ```bash + kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f - + ``` + +{{< note >}} You must run `kubectl exec -it pod/redis-di-cli -- redis-di deploy` after updating the ConfigMap with either option.{{< /note >}} + ## Ingest pipeline lifecycle Once you have created the configuration for a pipeline, it goes through the