Openshift operator that manages the XJoin pipeline. It currently manages version 1 of XJoin, i.e. it maintains the replication pipeline between HBI and ElasticSearch. Modifications will be necessary to support version 2 (joining data between applications).
A XJoin pipeline is defined by the XJoinPipeline custom resource. It indexes the hosts table of the HBI database into an ElasticSearch index. A Debezium Kafka Connector is used to read from the HBI database's replication slot. An ElasticSearch connector is used to index the hosts. Kafka Connect transformations are performed on the ElasticSearch connector to prepare the host records to be indexed. An ElasticSearch pipeline is used to transform the JSON fields on a host prior to being indexed.
The operator is responsible for:
- management of an ElasticSearch index, alias, and pipeline
- management of Debezium (source) and ElasticSearch (sink) connectors in a Kafka Connect cluster (using Strimzi)
- management of a Kafka Topic in a Kafka cluster (using Strimzi)
- management of the HBI replication slot. The Debezium connector should manage this. The operator ensures there are no orphaned replication slots.
- periodic validation of the indexed data
- automated recovery (e.g. when the data becomes out-of sync)
The operator defines two controllers that reconcile a XJoinPipeline
- PipelineController which manages all the resources (connectors, elasticsearch resources, topic, replication slots) and handles recovery
- ValidationController which periodically compares the data in the ElasticSearch index with what is stored in HBI to determine whether the pipeline is valid
-
Install dependencies
-
Set up a local minikube environment
-
Configure Kubernetes to use at least 8GB of memory and 5 cpus. This is known to work, although you can try with less.
minikube config set cpus 5 minikube config set memory 8000 minikube config set driver kvm2
-
Start minikube
minikube start
-
Login to https://quay.io and https://registry.redhat.io
-
docker login -u=<quay-username> -p="password" quay.io
-
docker login https://registry.redhat.io
-
For MacOS, do the following to place the creds in .docker/config.json, which are stored in
"credsStore": "desktop"|"osxkeystore"
and are not available for pulling images from private repos.docker logout quay.io
docker logout registry.redhat.io
- Remove the "credStore" block from .docker/config.json.
docker login -u=<quay-username> -p="password" quay.io
docker login https://registry.redhat.io
- NOTE: Manually creating the
.docker/config.json
and adding"auth": base64-encoded username:password
does not work.
-
-
Do one of the following
- Append the following line into
/etc/hosts
127.0.0.1 inventory-db host-inventory-db.test.svc xjoin-elasticsearch-es-default.test.svc connect-connect-api.test.svc kafka-kafka-0.kafka-kafka-brokers.test.svc apicurio apicurio.test.svc .test.svc
- Install and run kubefwd
sudo -E kubefwd svc -n test --kubeconfig ~/.kube/config -m 8080:8090 -m 8081:8091
- Append the following line into
-
./dev/setup-clowder.sh
This project uses golint-ci
To access the services within the Kubernetes cluster there is a script to forward ports to each of the useful services:
./dev/forward-ports-clowder.sh
or the ports can be forwarded via the kubefwd utility:
sudo -E kubefwd svc -n test --kubeconfig ~/.kube/config -m 8080:8090 -m 8081:8091
./dev/get_credentials.sh
is a helper script to populate a shell's environment with credentials to the databases and Elasticsearch. Use the following command to populate the environment.
source ./dev/get_credentials.sh test
It is easiest to completely delete the minikube instance then rerun the setup-clowder.sh
script when necessary.
minikube delete && minikube start && ./dev/setup-clowder.sh
With the cluster set up it is now possible to install manifests and run the operator locally.
-
Install CRDs
make install
-
Run the operator
make run ENABLE_WEBHOOKS=false
-
Finally, create a new pipeline
kubectl apply -f ../config/samples/xjoin_v1alpha1_xjoinpipeline.yaml -n test
There is also make delve
to debug the operator. After starting the Delve server process, connect to it with a Delve
debugger.
This is useful when testing deployment related changes. It's a little cumbersome for everyday development because an image needs to be built by app-interface and pushed to the cluster for each change.
- To deploy the operator via locally OLM run
./dev/install.operator.locally.sh
- To uninstall the OLM deployed operator run
./dev/uninstall.operator.locally.sh
This is more convenient than using the app-interface build because the build is done locally then pushed to quay.io. More info
docker login -u=$QUAY_USERNAME -p $QUAY_PASSWORD
./dev/install.operator.with.operator.sdk.sh
./dev/uninstall.operator.with.operator.sdk.sh
to uninstall.
- The tests require an initialized Kubernetes environment. See Setting up the development environment.
- Before running the tests, make sure the operator is not already running on your machine or in the cluster.
kubectl scale --replicas=0 deployments/xjoin-operator-controller-manager -n xjoin-operator-system
- They can be executed via
make test
. - There is also
make delve-test
to run the tests in debug mode. Thendelve
can be used to connect to the test run. - The tests take a while to run. To whitelist one or a few tests, prepend
It
with an F. e.g. changeIt("Creates a connector...
toFIt("Creates a connector...) {
- Sometimes when the test execution fails unexpectedly it will leave orphaned projects in kubernetes.
Use
dev/cleanup.projects.sh
to remove them.
See the version 1 development section for details on setting up minikube.
After setting up a kubernetes environment, the xjoin-operator can be run like this:
make run ENABLE_WEBHOOKS=false
Now that the xjoin-operator is running, it is time to create the k8s custom resources that define how the data is streamed from the database into Elasticsearch. Once the custom resources are defined, the xjoin-operator will start reconciling each resource and create the components necessary to stream the data.
kubectl apply -f config/samples/xjoin_v1alpha1_xjoindatasource.yaml -n test
kubectl apply -f config/samples/xjoin_v1alpha1_xjoinindex.yaml -n test
These commands create two k8s resources (XJoinIndex and XJoinDataSource) which then create many more k8s resources. The xjoin k8s resources are defined in the api/v1alpha1 directory.
The xjoin.v2 tests utilize mocks, so they don't require kubernetes or any other services to run.
make generic-test
This is a good order of where to look when the data is not syncing. When troubleshooting data sync issues, always query the Elasticsearch index directly instead of querying the GraphQL API.
- Verify the Kafka and Kafka Connect pods are running
- Check the logs of the xjoin-operator
- Look at the status of each of the following resources via
kubectl get -o yaml <resource name>
:XJoinIndex
,XJoinIndexPipeline
,XJoinDataSource
,XJoinDataSourcePipeline
,KafkaConnector
- Check the logs of Kafka Connect via
kubectl logs -f -l app.kubernetes.io/instance=connect --all-containers=true`
- Check the logs of Kafka via
kubectl logs -f -l app.kubernetes.io/instance=kafka --all-containers=true
- Check the logs of xjoin-core via
kubectl logs -f -l xjoin.index=xjoin-core-xjoinindexpipeline-hosts --all-containers=true
- Check if messages are on the datasource and index topics by using kcat to consume each topic from the beginning
kcat -b localhost:29092 -C -o beginning -f '%h\n%s\n\n\n' -t xjoin.inventory.1663597318878465070.public.hosts
The CRDs are contained in the api/v1alpha1 directory.
Custom Resource Definition | Description | Created By |
---|---|---|
XJoinDataSource | This defines a single data source (e.g. a database table). This is created by the user. | Human |
XJoinDataSourcePipeline | This defines the pipeline for a DataSource. Each DataSource can have multiple DataSourcePipelines. e.g. there will be two DataSourcePipeline's for a single DataSource when the DataSource is being refreshed. The old DataSource will continue to be used by applications until the new DataSource is in sync. At that point, the old DataSource is deleted and the new DataSource will be used by applications. | Code |
XJoinIndex | This defines an Index that is composed of one or more DataSources. The Index defines how the data is indexed into Elasticsearch. This is created by the user. | Human |
XJoinIndexPipeline | This defines the pipeline for an Index. This is similar to the XJoinDataSourcePipeline where each Index can have multiple IndexPipelines. | Code |
XJoinIndexValidator | This defines the validator for an Index. The validator periodically compares the data in each DataSource with the data in the Index. The validator is responsible for updating the status of the Index and each DataSource used by the Index. | Code |
The entrypoint to the reconcile loop for each Custom Resource Definition is in a separate file in the top level of the controllers directory. e.g. the Reconcile
method in the xjoindatasource_controller file is the entrypoint for a DataSource.
Almost all the remaining business logic is contained in the controllers directory. The other directories are mostly boilerplate and related to deployments/builds.
Both the DataSourcePipeline and the IndexPipeline manage multiple resources to construct a pipeline for the data to flow. Throughout the code these resources are referred to as a Component
and they are managed by a ComponentManager
. More details can by found in the controllers/components/README.md.
The code for the top level resources (XJoinDataSource and XJoinIndex) can be found in the controllers/datasource and the controllers/index directory.
There are many different parameters and sources of parameters across the operator. These are handled by the ConfigManager. The parameters for the xjoin.v2 resources are defined in controllers/parameters.
Each IndexPipeline is continuously validated via an IndexValidator. For a given Index there can be at most 2 IndexPipelines. One pipeline is considered active
while the other is refreshing
. The active
pipeline is served to users via the GraphQL Gateway. When the refreshing
pipeline becomes valid it replaces the active
pipeline. The status fields on each Kubernetes CRD is used to manage the active
and refreshing
state.
This is the high level flow of what happens when an active IndexPipeline becomes out of sync:
- The IndexValidator updates the status to invalid for each DataSourcePipeline that is referenced by the IndexPipeline
- The DataSource (parent of the DataSourcePipeline) watches the DataSourcePipeline. So, when the active DataSourcePipeline's status is set to invalid, the DataSource is reconciled and starts to refresh by creating a new DataSourcePipeline
- The IndexPipeline also watches the DataSourcePipeline and is reconciled to be invalid
- The Index is watching the IndexPipeline, so it is reconciled and starts to refresh by creating a new IndexPipeline
This is a summary of the different states an Index can be in:
- Newly created
- ActiveVersion: ""
- ActiveVersionIsValid: false
- RefreshingVersion: "1234"
- Data is in sync
- ActiveVersion: "1234"
- ActiveVersionIsValid: true
- RefreshingVersion: ""
- Data is out of sync, refreshing
- ActiveVersion: "1234"
- ActiveVersionIsValid: false
- RefreshingVersion: "5678"
- Data is back in sync, refreshing version replaced active version
- ActiveVersion: "5678"
- ActiveVersionIsValid: true
- RefreshingVersion: ""