Migrate and Validate Tables between Origin and Target Cassandra Clusters.
⚠️ Please note this job has been tested with spark version 3.5.2
- Get the latest image that includes all dependencies from DockerHub
- All migration tools (
cassandra-data-migrator
+dsbulk
+cqlsh
) would be available in the/assets/
folder of the container
- All migration tools (
- Download the latest jar file from the GitHub packages area here
- Install Java11 (minimum) as Spark binaries are compiled with it.
- Install Spark version
3.5.2
on a single VM (no cluster necessary) where you want to run this job. Spark can be installed by running the following: -
wget https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3-scala2.13.tgz
tar -xvzf spark-3.5.2-bin-hadoop3-scala2.13.tgz
⚠️ If the above Spark and Scala version is not properly installed, you'll then see a similar exception like below when running the CDM jobs,
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.releaseFence()V
⚠️ Note that Version 4 of the tool is not backward-compatible with .properties files created in previous versions, and that package names have changed.
cdm.properties
file needs to be configured as applicable for the environment. Parameter descriptions and defaults are described in the file. The file can have any name, it does not need to becdm.properties
.- A simplified sample properties file configuration can be found here as cdm.properties
- A complete sample properties file configuration can be found here as cdm-detailed.properties
- Place the properties file where it can be accessed while running the job via spark-submit.
- Run the below job using
spark-submit
command as shown below:
./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
Note:
- Above command generates a log file
logfile_name_*.txt
to avoid log output on the console. - Update the memory options (driver & executor memory) based on your use-case
- To track details of a run in the
target
keyspace, pass param--conf spark.cdm.trackRun=true
- To filter and migrate data only in a specific token range, you can pass the below two additional params to the
Migration
orValidation
jobs
--conf spark.cdm.filter.cassandra.partition.min=<token-range-min>
--conf spark.cdm.filter.cassandra.partition.max=<token-range-max>
- To run the job in Data validation mode, use class option
--class com.datastax.cdm.job.DiffData
as shown below
./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
- Validation job will report differences as “ERRORS” in the log file as shown below
23/04/06 08:43:06 ERROR DiffJobSession: Mismatch row found for key: [key3] Mismatch: Target Index: 1 Origin: valueC Target: value999)
23/04/06 08:43:06 ERROR DiffJobSession: Corrected mismatch row in target: [key3]
23/04/06 08:43:06 ERROR DiffJobSession: Missing target row found for key: [key2]
23/04/06 08:43:06 ERROR DiffJobSession: Inserted missing row in target: [key2]
- Please grep for all
ERROR
from the output log files to get the list of missing and mismatched records.- Note that it lists differences by primary-key values.
- The Validation job can also be run in an AutoCorrect mode. This mode can
- Add any missing records from origin to target
- Update any mismatched records between origin and target (makes target same as origin).
- Enable/disable this feature using one or both of the below setting in the config file
spark.cdm.autocorrect.missing false|true
spark.cdm.autocorrect.mismatch false|true
Note:
- The validation job will never delete records from target i.e. it only adds or updates data on target
- You can rerun/resume a Migration or Validation job to complete a previous run that could have stopped (or completed with some errors) for any reasons. This mode will skip any token-ranges from the previous run that were migrated (or validated) successfully. This is done by passing the
spark.cdm.trackRun.previousRunId
param as shown below
./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.trackRun.previousRunId=<prev_run_id> \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field)
--class com.datastax.cdm.job.GuardrailCheck
as shown below
./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
- Auto-detects table schema (column names, types, keys, collections, UDTs, etc.)
- Including counter table Counter tables
- Rerun job from where the previous job had stopped for any reason (killed, had exceptions, etc.)
- If you rerun a
validation
job, it will include any token-ranges that had differences in the previous run
- If you rerun a
- Preserve writetimes and TTLs
- Supports migration/validation of advanced DataTypes (Sets, Lists, Maps, UDTs)
- Filter records from
Origin
usingwritetimes
and/or CQL conditions and/or a list of token-ranges - Perform guardrail checks (identify large fields)
- Supports adding
constants
as new columns onTarget
- Supports expanding
Map
columns onOrigin
into multiple records onTarget
- Supports extracting value from a JSON column in
Origin
and map it to a specific field onTarget
- Fully containerized (Docker and K8s friendly)
- SSL Support (including custom cipher algorithms)
- Migrate from any Cassandra
Origin
(Apache Cassandra® / DataStax Enterprise™ / DataStax Astra DB™) to any CassandraTarget
(Apache Cassandra® / DataStax Enterprise™ / DataStax Astra DB™) - Supports migration/validation from and to Azure Cosmos Cassandra
- Validate migration accuracy and performance using a smaller randomized data-set
- Supports adding custom fixed
writetime
- Track run information (start-time, end-time, status, etc.) in tables (
cdm_run_info
andcdm_run_details
) on the target keyspace
- Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables
cdm_run_info
andcdm_run_details
in the target keyspace. - CDM does not migrate
ttl
&writetime
at the field-level (for optimization reasons). It instead finds the field with the highestttl
& the field with the highestwritetime
within anorigin
row and uses those values on the entiretarget
row. - CDM ignores
ttl
&writetime
on collection and UDT fields while computing the highest value - If a table has only collection and/or UDT non-key columns and not table-level
ttl
configuration, the target will have nottl
, which can lead to inconsistencies betweenorigin
andtarget
as rows expire onorigin
due tottl
expiry. - If a table has only collection and/or UDT non-key columns, the
writetime
used on target will be time the job was run. Alternatively if needed, the paramspark.cdm.transform.custom.writetime
can be used to set a static custom value forwritetime
. - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in
list
type columns. Note this is due to a Cassandra/DSE bug and not a CDM issue. This issue can be addressed by enabling and setting a positive value forspark.cdm.transform.custom.writetime.incrementBy
param. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table
cdm_run_info
will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.
- Clone this repo
- Move to the repo folder
cd cassandra-data-migrator
- Run the build
mvn clean package
(Needs Maven 3.9.x) - The fat jar (
cassandra-data-migrator-4.x.x.jar
) file should now be present in thetarget
folder
Checkout all our wonderful contributors here.