Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read_capacity_pct unused? #9

Closed
findchris opened this issue Aug 30, 2016 · 33 comments
Closed

read_capacity_pct unused? #9

findchris opened this issue Aug 30, 2016 · 33 comments

Comments

@findchris
Copy link

Greetings!

First run and this library works as advertised (thanks @traviscrawford for the open source contribution 👍), except the readCapacityPct option doesn't appear to be respected.

My snippet:

val df = sqlContext
  .read
  .option("read_capacity_pct", "20")  // <-- this should set usage to 20%, right?
  .schema(TableSchema)
  .dynamodb(TableName)
df.count

The issue I'm seeing is I watched the read unit consumption jump to ~85%, which won't fly in a production environment. Am I configuring the read_capacity_pct option correctly?

From what I see, readCapacityPct gets declared but is not used elsewhere.

Cheers.

@findchris
Copy link
Author

Also, since it is related, how does this option interact with the segments option? Thanks!

@findchris
Copy link
Author

Hey @traviscrawford, are you sure your read_capacity_pct is being honored correctly on this version of spark-dynamodb? I can't see how it would work correctly.

@findchris
Copy link
Author

I did a bit more digging and didn't see anything provided by AWS for explicitly throttling consumption of read units. I did stumble across an article discussing a technique for doing this at the application level, but I don't see anything like this in spark-dynamodb.

I assume the ScanSpec's inclusion of withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) is meant to facilitate this work, but I don't see any further usage of capacity.

Anyway, thanks for looking at this with me. This is blocking a production deploy.

@findchris
Copy link
Author

Only other thought I had: Is this throttling enforced per Spark executor? If not, how is it enforced? If it's per executor, obviously a distributed Spark job of varying size would have different capacity consumption based on the cluster size.

@traviscrawford
Copy link
Owner

Hi @findchris , thanks for reporting this. You're correct - readCapacityPct is not used at this time. The background here is I'm porting our internal DynamoDB scanner that does have a rate limit to this native data source, and have not ported the rate limiter over. I'll take a look at this now.

Great to hear you find this useful.

traviscrawford pushed a commit that referenced this issue Sep 15, 2016
Here we add support for the `rate_limit_per_segment` option. We use a Guava
RateLimiter for each scan segments that treats DynamoDB consumed read capacity
units as RateLimiter permits.

Connects to #9
@traviscrawford
Copy link
Owner

Hi @findchris - what do you think about the approach in https://github.com/traviscrawford/spark-dynamodb/compare/travis/ratelimit ? Are you able to try this in your environment prior to publishing a new release?

@findchris
Copy link
Author

Thanks for chiming in here @traviscrawford. The pull request looks solid and straight-forward. 👍

Can you help me understand the semantics of segments, and how this interacts with the rate_limit_per_segment, given the distributed environment of Spark?

I'd be happy to test this in QA, but I'm still relatively new to the Scala world. I'm currently using this project like so:

"com.github.traviscrawford" % "spark-dynamodb" % "0.0.2"

Besides a version bump (your call), got an easy way for me to test this out?

Out of curiosity, how do you implement your internal DynamoDB scanner's rate limiting?

@traviscrawford
Copy link
Owner

Take a look at http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/QueryAndScanGuidelines.html for an overview of how to best scan DynamoDB tables.

When tuning scans, there are a number of variables:

  • Table provisioned read capacity units
  • Number of records
  • Size of table
  • Scan segments
  • Page size
  • Scanner rate limit

I generally scan tables as part of an ETL process, and start with a single segment that uses about 20% of the provisioned read capacity units. If that's fast enough I don't look much further. Sometimes the scan is slow because a small table has very low provisioned read capacity units, so I increase that. Sometimes the table is large and adding scan segments greatly speeds up the scan.

Given the above variables that affects table scans, what do your tables look like?

traviscrawford added a commit that referenced this issue Sep 16, 2016
Here we add support for the `rate_limit_per_segment` option. We use a Guava
RateLimiter for each scan segments that treats DynamoDB consumed read capacity
units as RateLimiter permits.

Connects to #9
@findchris
Copy link
Author

Thanks for the link and thoughts @traviscrawford. I haven't had to do much Scanning in the past (so I hadn't yet read about TotalSegments), but like you, I'll be using this for an ETL process. I have a pretty big table (~150GB), with a fair number of attributes (although I'll only be using a handful of them in my ETL).

Like you mention, I'll just have to experiment with different values for the segments parameter.

I noticed that you just added 0.0.4-SNAPSHOT. Is that ready for me to test out?

@traviscrawford
Copy link
Owner

I just published version 0.0.3 with the rate limiting change to maven central - it might take a while to propagate, but once it does you can try that out.

@findchris
Copy link
Author

👍 I'll keep you posted on my testing.

@findchris
Copy link
Author

@traviscrawford - I just tried this out (sorry for the delay), but it looks like there might be a guava version conflict or something (based on the stackoverflow threads here and here). The exception:

java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.createStarte
d()Lcom/google/common/base/Stopwatch;
        at com.google.common.util.concurrent.RateLimiter$SleepingStopwatch$1.<init>(RateLimiter.java:417)
        at com.google.common.util.concurrent.RateLimiter$SleepingStopwatch.createFromSystemTimer(RateLimiter.java:416)
        at com.google.common.util.concurrent.RateLimiter.create(RateLimiter.java:130)
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:147)
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:145)
        at scala.Option.map(Option.scala:145)
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$.scan(DynamoDBRelation.scala:145)

@findchris
Copy link
Author

@traviscrawford - I'm still trying to resolve this. It appears that Spark depends on guava 14.0, and it appears as if this spark-dynamodb depends on guava 18.0. The Stopwatch.createStarted call doesn't existing in 14.0 from what I can tell, and that is the source of the problem.

I'm looking into "shading" the dependency, but don't have much experience there.

@findchris
Copy link
Author

Still no luck. I've tried:

// in  build.sbt
dependencyOverrides += "com.google.guava" % "guava" % "18.0"
// direction taken from: http://arjon.es/2015/10/12/making-hadoop-2-dot-6-plus-spark-cassandra-driver-play-nice-together/
assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
)

This StackOverflow answer would seem to point to the solution, but I've had no luck using --conf spark.{driver,executor}.userClassPathFirst=true.

To reiterate, guava 14.0, which Spark 1.6.1 depends on, lacks the method Stopwatch.createStarted. guava 18.0, however, has this method, which is used by the RateLimiter.

I think the answer lies with shading, but I have no experience using this technique, and my attempts above failed.

@traviscrawford, can you offer any thoughts?

@timchan-lumoslabs
Copy link
Contributor

@findchris are you marking your Spark dependencies as "provided"?

@findchris
Copy link
Author

Thanks for chiming in @timchan-lumoslabs. Here is what I have in my build.sbt's libraryDependencies section:

  "org.apache.spark" %% "spark-core" % "1.6.1" % "provided" exclude("org.slf4j", "jcl-over-slf4j") exclude("org.scala-lang", "scala-library") exclude("org.scala-lang", "scala-reflect") exclude("com.google.guava", "guava"),
  "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided" exclude("org.scala-lang", "scala-library") exclude("org.scala-lang", "scala-reflect") exclude("com.google.guava", "guava"),
  "org.apache.spark" %% "spark-hive" % "1.6.1" % "provided" exclude("stax", "stax-api") exclude("org.scala-lang", "scala-library") exclude("org.scala-lang", "scala-reflect") exclude("com.google.guava", "guava"),
  "org.apache.spark" %% "spark-mllib" % "1.6.1" % "provided"

See anything obviously for me to change?

@findchris
Copy link
Author

@traviscrawford / @timchan-lumoslabs - Any insights as to what might be going on? I just need to make sure spark-dynamodb is making calls to guava 18.0 and not 14.0, and sadly I don't know how to do it.

@traviscrawford
Copy link
Owner

Hi @findchris - my hunch is we could downgrade to the version of Guava used by Spark and make this issue go away for you. Will take a look...

@findchris
Copy link
Author

@traviscrawford - That's always an option. I have to imagine there is a solution to this problem, and I believe it involves shading - I'm just not sure how to do it. Maybe you can shade guava and still use 18.0 (or higher) using maven-shade-plugin?

Either way, I appreciate the help!

@traviscrawford
Copy link
Owner

@findchris @timchan-lumoslabs Question for you - internally at Medium we have tried a few different approaches to integrating DynamoDB with Spark, and the approach we're planning on using going forward is:

Using a Spark job to backup the DynamoDB table as JSON on s3. We chose JSON over a binary format such as Parquet because DynamoDB tables do not have a schema, so we can avoid the schema issue during the backup phase.

Then we simply read the DynamoDB backups like you normally would read JSON from a Spark job. At this point DynamoDB is not in the picture at all.

Would the above approach work for you? If so, I could see if we can open source the Spark-based DynamoDB scanner. If we're all using the same approach and code in production it would be easier to make sure it handles all the corner cases.

@findchris
Copy link
Author

@traviscrawford - I appreciate the collaboration and interest in sharing more useful code.

Your suggestion sounds interesting. Let me share my use case and we can see if we have a compatible use case. I need to scan an entire DynamoDB table (optimally I'd use a filter expression to eliminate some records lacking certain attributes), projecting out a subset of the returned attributes, doing some light transformation of the data, and then write out the results to S3 as CSV.

So for my use case, I'm not doing a straight backup to S3. I suppose what you describe would work, with the disadvantage that I'd need two Spark jobs for my case: One to backup the table to S3 as JSON, and another to read the JSON and operate on it.

Does that help to shed light on my usage?

@timchan-lumoslabs
Copy link
Contributor

@traviscrawford Our use case is somewhat similar to yours. We are basically forklifting the data in a DynamoDB table attribute to Redshift. Item attribute values are JSON.

@traviscrawford
Copy link
Owner

@findchris We have some use-cases like yours too, where we need to filter & project some records in the DynamoDB table before processing. DynamoDB filters are interesting, where you actually scan the full table, and filters are applied before returning rows to the client. Since behind-the-scenes we're scanning the full table, we simply write the whole thing out to s3. Then a separate job processes the backups.

Would y'all find if useful if we published the backup job we're using?

@findchris
Copy link
Author

Would y'all find if useful if we published the backup job we're using?

Yes. AWS has their Data Pipeline stuff, but it feels more consistent to have it all in Spark. Does this backup job happen to have a throttling mechanism built in @traviscrawford ? ;-)

@traviscrawford
Copy link
Owner

@findchris I just switched to using the guava dependency that Spark provides, so you shouldn't have the issue anymore.

Can you mvn install -DskipTests on your local machine to build from source, then depend on the snapshot version you installed locally to test this. If it works I'll publish a new release to maven central.

@findchris
Copy link
Author

findchris commented Sep 25, 2016

@traviscrawford - A bit embarrassed to ask, but I'll need more hand-holding to do what you suggest. I use sbt, and haven't done any building from source for my dependencies. (I think publishTo is what I need; just totally unfamiliar).

Regardless, I appreciate the work. Looking forward to testing this all out.

@timchan-lumoslabs
Copy link
Contributor

timchan-lumoslabs commented Sep 25, 2016

@findchris You're building the spark-dynamodb project, which is built with Maven.

git clone git@github.com:traviscrawford/spark-dynamodb.git
cd spark-dynamodb
mvn install -DskipTests -Dgpg.skip=True

copy spark-dynamodb-0.0.4-SNAPSHOT.jar into lib of your sbt project. You can comment out your spark-dynamodb dependency in sbt.

@findchris
Copy link
Author

@timchan-lumoslabs I appreciate the tips! I did what you said, and it compiled ok as sbt assembly ran without issue.

However, @traviscrawford, when I run my job I'm seeing a new stacktrace:

java.lang.NoClassDefFoundError: com/google/common/util/concurrent/RateLimiter
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:147)
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:145)
        at scala.Option.map(Option.scala:145)
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$.scan(DynamoDBRelation.scala:145)
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$buildScan$3.apply(DynamoDBRelation.scala:97)
        at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$buildScan$3.apply(DynamoDBRelation.scala:97)
...
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.google.common.util.concurrent.RateLimiter
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 25 more

I was concerned RateLimiter wasn't around in guava 14.0, but it was introduced in 13.0.

Any idea what's up? I hope to investigate more tomorrow.

@findchris
Copy link
Author

A follow up:
When I tried to import the actual RateLimiter class (based on the above path), we get an error:

scala> import com.google.common.util.concurrent.RateLimiter
<console>:51: error: object util is not a member of package com.google.common
         import com.google.common.util.concurrent.RateLimiter

However, when I explicitly depend on guava 14.0.1 in my sbt file ("com.google.guava" % "guava" % "14.0.1"), I finally got it working with rate-limiting!

This was using the manual steps provided by @timchan-lumoslabs to build spark-dynamodb-0.0.4-SNAPSHOT.jar.

@traviscrawford, not sure why I explicitly had to specify the guava dependency, but seems good now. Merge?

@traviscrawford
Copy link
Owner

Thanks for testing this! It's been published to maven central.
On Thu, Sep 29, 2016 at 4:15 PM Chris Johnson notifications@github.com
wrote:

A follow up:
When I tried to import the actual RateLimiter class (based on the above
path), we get an error:

scala> import com.google.common.util.concurrent.RateLimiter
:51: error: object util is not a member of package com.google.common
import com.google.common.util.concurrent.RateLimiter

However, when I explicitly depend on guava 14.0.1 in my sbt file ("com.google.guava"
% "guava" % "14.0.1"), I finally got it working with rate-limiting!

This was using the manual steps provided by @timchan-lumoslabs
https://github.com/timchan-lumoslabs to build
spark-dynamodb-0.0.4-SNAPSHOT.jar.

@traviscrawford https://github.com/traviscrawford, not sure why I
explicitly had to specify the guava dependency, but seems good now. Merge?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#9 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAG2gpjQueR3b8OBZNid4hqLMgCoFv2gks5qvEZ3gaJpZM4JxGV0
.

@findchris
Copy link
Author

Is the current version then 0.0.6-SNAPSHOT?

@traviscrawford
Copy link
Owner

Latest version is 0.0.5 in Maven Central. You can see the release history at:
http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22com.github.traviscrawford%22%20AND%20a%3A%22spark-dynamodb%22

@findchris
Copy link
Author

Thanks for all the correspondence! Closing this out now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants