Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema inference for DynamoDB is broken #103

Closed
hopugop opened this issue Dec 21, 2023 · 18 comments · Fixed by #105
Closed

Schema inference for DynamoDB is broken #103

hopugop opened this issue Dec 21, 2023 · 18 comments · Fixed by #105

Comments

@hopugop
Copy link
Contributor

hopugop commented Dec 21, 2023

Description

The schema for DynamoDB tables is inferred from the first page of a scan. If any item on other pages does not match the schema derived from the items on the first page, Migrator silently ignores additional attributes and does an incomplete copy of the item.

Steps to reproduce

Create a table on DynamoDB

(table will be auto-created on Alternator upon the Migrator run)

aws dynamodb create-table \
    --table-name MigrationTest \
    --attribute-definitions \
        AttributeName=id,AttributeType=N \
        AttributeName=sk,AttributeType=N \
    --key-schema \
        AttributeName=id,KeyType=HASH \
        AttributeName=sk,KeyType=RANGE \
    --provisioned-throughput \
        ReadCapacityUnits=5,WriteCapacityUnits=5 \
    --table-class STANDARD

Insert two items

aws dynamodb put-item \
    --table-name MigrationTest  \
    --item \
    '{"id": {"N": "4"},"sk": {"N": "4"},"AlbumTitle": {"S": "aaaaa"},"Awards": {"N": "4"}}'

aws dynamodb put-item \
    --table-name MigrationTest  \
    --item \
    '{"id": {"N": "999"},"sk": {"N": "99999"},"asdfg": {"M": {"fffff": {"S": "asdfasdfs"}}}}'

Run migrator

Migrator will detect schema as:

23/12/13 15:52:49 INFO migrator: Created source dataframe; resulting schema:
root
 |-- Awards: integer (nullable = true)
 |-- sk: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- AlbumTitle: string (nullable = true)

Migrator run completes with no error, but when comparing the items we see data is missing.

Migration results

Items were copied with no errors, but map attribute asdfg is missing from the Alternator copy because it was not present on the first page.

Alternator scan

{
    "Items": [
        {
            "id": {
                "N": "4"
            },
            "sk": {
                "N": "4"
            },
            "AlbumTitle": {
                "S": "aaaaa"
            },
            "Awards": {
                "N": "4"
            }
        },
        {
            "id": {
                "N": "999"
            },
            "sk": {
                "N": "99999"
            }     #### this item is incomplete
        }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "ConsumedCapacity": null
}

DDB scan results

{
    "Items": [
        {
            "sk": {
                "N": "4"
            },
            "id": {
                "N": "4"
            },
            "AlbumTitle": {
                "S": "aaaaa"
            },
            "Awards": {
                "N": "4"
            }
        },
        {
            "sk": {
                "N": "99999"
            },
            "asdfg": {              #### this map is missing from Alternator
                "M": {
                    "fffff": {
                        "S": "asdfasdfs"  
                    }
                }
            },
            "id": {
                "N": "999"
            }
        }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "ConsumedCapacity": null
}
@julienrf
Copy link
Collaborator

Hello @hopugop, do you have an example of migrator configuration file that I could use to reproduce the issue?

@hopugop
Copy link
Contributor Author

hopugop commented Feb 21, 2024

Hello @hopugop, do you have an example of migrator configuration file that I could use to reproduce the issue?

source:
  type: dynamodb
  table: {DDB TABLE}
  region: {AWS REGION}
  scanSegments: 10
  readThroughput: 1
  throughputReadPercent: 1.0
  maxMapTasks: 2
  streamChanges: false

target:
  type: dynamodb
  table: {SCYLLA ALTERNATOR TABLE}
  endpoint:
    host: http://{SCYLLA IP}
    port: {SCYLLA PORT}
  scanSegments: 1
  readThroughput: 1
  throughputReadPercent: 1.0
  maxMapTasks: 1
  streamChanges: False

I used credentials on ~/.aws as usual for aws apps.

@julienrf
Copy link
Collaborator

julienrf commented Feb 21, 2024

Thank you for your help @hopugop. I was not able to reproduce the issue, though. In my case, no schema is inferred at all:

INFO migrator: Created source dataframe; resulting schema:
root

Which led to a failure when the migrator tried to copy the source items:

WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, 192.168.16.4, executor 0): java.lang.IllegalArgumentException: Field "id" does not exist.
Available fields: 

Does that ring a bell to you?

@hopugop
Copy link
Contributor Author

hopugop commented Feb 21, 2024 via email

@julienrf
Copy link
Collaborator

The source table contained all the data before running Migrator, the target table was non-existent and it was created during the execution of Migrator. I see no error or warning before that failure.

@julienrf
Copy link
Collaborator

julienrf commented Feb 22, 2024

Although I could not reproduce the problem locally (in my environment, the migration fails because no schema is inferred at all), I have investigated the issue and I paste my notes here.

It seems to be due to a known limitation of the spark-dynamodb library. See audienceproject/spark-dynamodb#90. According to this article, this is a design decision:

This approach has a few drawbacks, however. If any properties on items are not present in the one-page scan, they will not be added to the schema, and will therefore not be readable by the data source. This has no easy solution since it is necessary for Spark to know the schema in advance of partitioning the initial RDD, which is necessary for the throughput calculation described above.

The suggestion of the authors of spark-dynamodb is to manually define the schema instead of relying on schema inference:

The basic assumption with this approach is that the user has a clear idea of what kind of items are stored in the database, and can express this in terms of an entity case class. This case class will be useful not only for imposing a schema on the data when reading from DynamoDB but also for typing Datasets in Spark.

The line of code that scans the first page is located here:

https://github.com/scylladb/spark-dynamodb/blob/48faabc86604c612744f2dc9d65d678a4ba95374/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoDataSourceReader.scala#L89

@julienrf
Copy link
Collaborator

Small update: I was able to get my schema inferred by changing the configuration you provided: I used scanSegments: 1 instead of scanSegments: 10.

However, I received both items in the first page, thus the schema inference works as expected:

24/02/22 10:40:25 INFO migrator: Created source dataframe; resulting schema:
root
 |-- asdfg: struct (nullable = true)
 |    |-- fffff: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- sk: integer (nullable = true)
 |-- AlbumTitle: string (nullable = true)
 |-- Awards: integer (nullable = true)

Note that using anything else than scanSegments: 1 will always fail because the scan process only requests the first segment (the first parameter of the scan call is always 0).

@tarzanek
Copy link
Contributor

tarzanek commented Feb 22, 2024

I was thinking about following options how to fix it

Option 1- Find and integrate a better library than we use now for DynamoDB that has better schema inference (might be very hard, such library might not exist, ev. we might get lucky and stock amazon SDK might have something now, or we will rewrite it to how they intended this to work)

Option 2- rewrite migrator to migrate in dynamic way - schemaless, with Maps as content for rows and this way we won’t really care about schema (since Spark internally uses Spark SQL this might be tricky) - the question is if amazon SDK has support for such reads/writes

or don't do any inference and force explicit schema, then however we will have to deal with write failures when source will have items not matching schema (as above example show)

what do you think about above options @julienrf ?

@julienrf
Copy link
Collaborator

I will look at option 1…

I am not sure about option 2: Spark inherently needs schemas, no?

Actually, I like the third option: allowing the users to explicitly provide their schema (either in a YAML form, or as case class definitions, or maybe even as a Spark DataType definition). The drawback is that that would require an extra step from the users. The benefits of this approach is that it seems realistic 😉 and users could get extra benefits from it (like providing a better schema than what would have been otherwise inferred).

Otherwise, the 4th option I had in mind was to introduce a setting to allow the schema inference process to fetch more than the first page only. The drawback is that users may have to fetch a high number of pages in some cases…

@tarzanek
Copy link
Contributor

Option 2 is actually the most versatile one (unless option 1 fails or gives us methods to nicely implement 2), since you can create a dynamic map schema - DynamoDB is in the end schemaless too - you just need PK and sort key , the rest of columns are sparsely populated and can be basically anything
so if the reader DF is reading the PK and sorting KEY and rest of data as a map, you can then just send it the same way
and schema as per se can be created just with those 2 important things and rest will be dynamic

resp. the gotcha there will be the type mapping to Spark, but I am hoping this can be avoided somehow (I didn't read DynamoSDK yet)

@tarzanek
Copy link
Contributor

tarzanek commented Feb 23, 2024

hmm, long time ago I wrote this: be84d86#diff-4a83461826fff740a26ca2d8d24aa7c554743398d5a5c215a92c4041d263df13

and it was simplistic, but generic from this POV (note that it only sets PK and SK) , but then it lacks renames and likely scan segments and throughput are not used
maybe current use can be replaced by above, resp above can be updated to recent SDK usage (maybe current SDK even has a way to resume? (which would be terrific!) )

btw. can you base your experiments on top of https://github.com/scylladb/scylla-migrator/tree/spark-3.1.1 please?
(we can even upgrade it if needed to newer sparks)

@julienrf
Copy link
Collaborator

Interesting, thank you for the pointers. Was your work in #5 lost in #23?

Is there a way we could chat without polluting the issue discussion?

@tarzanek
Copy link
Contributor

that work wasn't lost, just Itamar wrapped it with the audienceproject/spark-dynamodb library (to not reimplement things, but this library is dead now) + added support for streams with kinesis (which is just extracted from SDK iirc)
that code was really simplistic, that said we are already forking spark-dynamodb library and have a repo for kinesis, so we can really do anything there (though I don't like the fact of needing to maintain our own fork)

@tzach
Copy link

tzach commented Feb 25, 2024

Adding @nyh to comment on the schemes migration alternative

@nyh
Copy link

nyh commented Feb 25, 2024

Adding @nyh to comment on the schemes migration alternative

If I understand correctly, the idea is to migrate a DynamoDB to CQL with a full schema with typed columns, not to partial-schema (only primary key) Alternator?

I'm not sure what you're asking me and what alternatives you are asking about (I see a lot of different discussions above). The thing is - as you know, in general, DynamoDB doesn't need to have a schema... It is quite legal that for an attribute called "x" to be a string in some items, be a number in other items, or a list in others - or be missing altogether from some items. So I'm guessing that you're making the assumption that this particular use case did NOT do this: if attribute "x" appears on a single item as a string, it must be a string in any other item it appears on.

But still, we shouldn't check the existence of an attribute named "x" by looking at one single item - it is legal for any single item not to contain a value for this "x", and it is quite common so I'm guessing you don't want to assume that this deson't happen... So can we look at "a page" of items? A million items? In theory, the answer is no: We can have even a billion items missing a value for "x" and then the (billion+1)th item will suddenly have it. Can you assume this this does not happen? I am guessing that you don't want to assume that.

So in this case you have no choice but to Scan all the items in the database, and list the attributes in them and their types (and you can print a warning if you see a discrepancy, like "x" being a string in one item and number in another). This will be expensive (scanning the entire database) but DynamoDB does not have any way to efficiently get a list of all attributes used by any item without such a scan.

By the way, even for numbers, since Scylla supports different kinds of numbers (integers and floating-point are different types, and there are various lengths for integer types), you may want to figure out a number's appropriate type (int, bigint, double, etc.) from a sample of values or all the and looking at a single value or a bad sample may not be enough.

Instead of scanning the entire database you could scan only a fraction of the partitions (e.g., by using the Segment option) or some other way of sampling the data, but you'll need to assume that this sample in fact represents all the data. It might not be true if the application changed its schema recently - e.g., yesterday a new attribute "x" started to be added to items, and we still have a billion items which don't have that "x" attribute. Or maybe you have an attribute which until yesterday was always an integer value - but yesterday somebody inserted the value "3.14" and it can no longer be kept as an integer...

@hopugop
Copy link
Contributor Author

hopugop commented Feb 25, 2024

If I understand correctly, the idea is to migrate a DynamoDB to CQL with a full schema with typed columns, not to partial-schema (only primary key) Alternator?

That is not the case, @nyh. Migrator only supports DDB-API to DDB-API migration currently. DDB to CQL and vice-versa are not supported. There is no user-driven desire to have a fixed schema, afterall this is a DDB to Alternator migration.
The need for a schema is somewhat related to the internal API Spark uses and a library implementation detail.

@nyh
Copy link

nyh commented Feb 25, 2024

That is not the case, @nyh. Migrator only supports DDB-API to DDB-API migration currently.
The need for a schema is somewhat related to the internal API Spark uses and a library implementation detail.

Wow, so you're saying that we're migrating a schema-less table to another schema-less table, but the implementation forces us to invent a schema, whose only raison d'etre is to tell the implementation not to silently drop attributes from items? I guess the only thing I can suggest is "fix the implementation" :-) If the implementation sees an attribute it doesn't know (i.e., not a key attribute) it should just copy it verbatim from the source to the target, instead of ignoring it. I'm not sure what other ideas @tzach hoped I might have to work around the broken implementation.

Thinking again, maybe I have an idea what you can do to continue using the broken implementation. Imagine that you had a "filter" before and after the existing schema-full migration code:

  1. The source-table filter converts a row with known key types plus any number of unknown attributes to a row with a simple schema: the key columns (of known types) plus a "list of attributes" in some serialized form.
  2. Then this converted row is shipped off to the target
  3. The target reads that serialized form and converts it again into a list of attributes of arbitrary types, and writes them to the target table.
    I don't know anything about your implementation and if it's possible to stick such "filters" in both ends to do such conversions.

@julienrf
Copy link
Collaborator

julienrf commented Feb 26, 2024

Thank you all for your inputs.

There is no user-driven desire to have a fixed schema

This point was not clear to me when I wrote #103 (comment). But if that’s the case, then I agree with @nyh, we should fix the implementation to not require a schema.

I believe we could still perform the data transfer with Spark using plain text RDDs (as in #5) instead of using spark-dynamodb which forces us to use a schema.

We would still have to parse the items as JSON and back to text if the user specified column renamings, though, but this should not be an issue.

julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 1, 2024
This is a work in progress to address scylladb#103.

Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.

The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way.

The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`.

To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in scylladb#23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.
julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 8, 2024
This is a work in progress to address scylladb#103.

Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.

The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way.

The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`.

To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in scylladb#23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.
julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 12, 2024
julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 12, 2024
This is a work in progress to address scylladb#103.

Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.

The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way.

The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`.

To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in scylladb#23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.
julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 12, 2024
julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 12, 2024
This is a work in progress to address scylladb#103.

Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.

The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way.

The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`.

To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in scylladb#23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.
julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 12, 2024
julienrf added a commit to julienrf/scylla-migrator that referenced this issue Mar 12, 2024
This is a work in progress to address scylladb#103.

Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.

The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way.

The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`.

To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in scylladb#23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants