Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete of embedded Cassandra? #380

Closed
mpouttuclarke opened this issue Sep 24, 2013 · 31 comments
Closed

Delete of embedded Cassandra? #380

mpouttuclarke opened this issue Sep 24, 2013 · 31 comments

Comments

@mpouttuclarke
Copy link
Contributor

On the head it looks like this functionality was deleted. This is a non-backward compatible change and I am currently using this, so is there an alternative being introduced?

@mpouttuclarke
Copy link
Contributor Author

I have extensive experience with Cassandra, so I can carry this forward if you guys aren't looking forward to maintaining this...

@okram
Copy link
Contributor

okram commented Sep 24, 2013

AFAIK: The head is 0.4.0. 0.3.3 will go into the titan-03 branch. For 0.4.0, cassandra-embedded is no longer available.

On Sep 24, 2013, at 10:16 AM, Matt Pouttu-Clarke notifications@github.com wrote:

On the head it looks like this functionality was deleted. This is a non-backward compatible change and I am currently using this, so is there an alternative being introduced?


Reply to this email directly or view it on GitHub.

@mbroecheler
Copy link
Member

As discussed on the mailing list (https://groups.google.com/d/topic/aureliusgraphs/EasJTTkDtfY/discussion) we are removing the embedded option for Cassandra since it is slower than running Cassandra in a separate process and communicating over localhost. We will provide helper scripts to make that setup easy to use.

You could maintain the adapter in a separate repo. We are removing it because we see little benefit and a lot of additional code complexity.

@mpouttuclarke
Copy link
Contributor Author

Thanks for the feedback. It looks like you guys are testing with Amazon using ephemeral disk. Are you interested in results with Cassandra on Fusion-io? I think you will find different results when the storage is faster than the network interface... I have some bare metal Fusion-io machines available, if you want I can run some tests.

@mpouttuclarke
Copy link
Contributor Author

FYI: I have already tried the Amazon Flash instances with both RHEL 6 and Amazon Linux and they're extremely slow compared to bare metal.

@mbroecheler
Copy link
Member

Making sure I understand you correctly: Are you saying that using Cassandra
embedded is significantly faster than using Cassandra via localhost unless
running the experiments on AWS?
We have also been running experiments locally. It is true that Cassandra
embedded gives you lower latency, but the behavior under stress is much
more erratic due to compounding GC issues to the point where the throughput
is higher when running in separate JVMs.

That's why we are moving away from Cassandra embedded since I believe that
low latency applications are better served by in-memory solutions which we
will be rolling out with Fulgora.
Does that make sense?

On Tue, Sep 24, 2013 at 12:49 PM, Matt Pouttu-Clarke <
notifications@github.com> wrote:

FYI: I have already tried the Amazon Flash instances with both RHEL 6 and
Amazon Linux and they're extremely slow compared to bare metal.


Reply to this email directly or view it on GitHubhttps://github.com//issues/380#issuecomment-25036608
.

Matthias Broecheler
http://www.matthiasb.com

@mpouttuclarke
Copy link
Contributor Author

I was wondering if you had tested with Flash (esp. PCI Flash like Fusion-io). The heap pressure is mitigated by:

  • Cassandra 1.2 moving bloom filters and cache off-heap out of the JVM
  • PCI Flash is so fast that caching in the JVM has no benefit, so Cassandra key cache and row cache are unnecessary
  • Many low-latency apps would turn consistency to ONE or ANY which in that case the response comes back when the in-process Cassandra commit log responds, and does not wait for the other nodes
  • My application is so low latency that the serialization across even a localhost interface has major effects

In my use case I am even considering directly integrating with the Flash API via JNI because Cassandra may be too slow even with all of the above.

So for my use case my latency requirement is such that I am considering either in-process Cassandra or direct interface to Flash. I have tested Cassandra directly on localhost interface and it is not fast enough, although this test was not with Titan in the middle I have to assume that injecting Titan will introduce more latency than I am willing to accept...

@mbroecheler
Copy link
Member

That's really interesting. Do you have any numbers that you could share.

I agree that there is a lot of tuning you can do to reduce the latency of
Cassandra. I guess my question is: Is Cassandra a good target for low
latency applications in the first place? It seems that distributed data
grids would be far preferable for such use cases. This is why we have
project "Fulgora" targeted at data grids as RAM is still an order magnitude
faster than Flash.

Thanks for the great discussion Matt.

On Wed, Sep 25, 2013 at 8:42 AM, Matt Pouttu-Clarke <
notifications@github.com> wrote:

I was wondering if you had tested with Flash (esp. PCI Flash like
Fusion-io). The heap pressure is mitigated by:

  • Cassandra 1.2 moving bloom filters and cache off-heap out of the JVM
  • PCI Flash is so fast that caching in the JVM has no benefit, so
    Cassandra key cache and row cache are unnecessary
  • Many low-latency apps would turn consistency to ONE or ANY which in
    that case the response comes back when the in-process Cassandra commit log
    responds, and does not wait for the other nodes
  • My application is so low latency that the serialization across even
    a localhost interface has major effects

In my use case I am even considering directly integrating with the Flash
API via JNI because Cassandra may be too slow even with all of the above.

So for my use case my latency requirement is such that I am considering
either in-process Cassandra or direct interface to Flash. I have tested
Cassandra directly on localhost interface and it is not fast enough,
although this test was not with Titan in the middle I have to assume that
injecting Titan will introduce more latency than I am willing to accept...


Reply to this email directly or view it on GitHubhttps://github.com//issues/380#issuecomment-25098591
.

Matthias Broecheler
http://www.matthiasb.com

@mpouttuclarke
Copy link
Contributor Author

I haven't had a lot of luck so far with a non-embedded Cassandra with low latency and I've been working at it for 1+ years. We still haven't been able to pull the switch on using Cassandra in our low latency products. I was hoping the embedded Cassandra would work out better, or some of the newer Flash capabilities DataStax is working on.

Yes, data grids work well for this as long as the GC doesn't get out of hand. I have some interesting design patterns I use to avoid the GC and enable off-heap storage that spills over to Flash. Unfortunately, no Java APIs or tools I'm aware of actually are able to use these design patterns out of the box. I've had to re-write Guava BloomFilter and CERN BitVector for example to utilize my methods. For the data sizes I'm dealing with the price difference between RAM and Flash makes these efforts worthwhile. But yes, you guys have it right that the GC is the enemy of low latency, however I'm not sure separating Titan and Cassandra into different JVMs would solve the root cause of the problem.

@mpouttuclarke
Copy link
Contributor Author

Sure I can share some stats, I will start running Titan/Cassandra embedded and non-embedded tests today with both an 8 disk RAID0 and fio. I’ll let you know how it goes. Here’s the box spec:

HP DL380p Gen 8
2x 8 core E5-2670 (32 total threads):link http://www.cpu-world.com/CPUs/Xeon/Intel-Xeon%20E5-2690.html
192 GB DDR3-1600 memory (16GB DIMMS)
8x 1 TB 7.5k RPM SAS SFF disk (RAID 0)

Fusion IO IODrive 640G

RHEL 6.4, JDK 1.7.0_25

From: Matthias Broecheler [mailto:notifications@github.com]
Sent: Wednesday, September 25, 2013 10:33 AM
To: thinkaurelius/titan
Cc: Pouttu-Clarke, Matt
Subject: Re: [titan] Delete of embedded Cassandra? (#380)

That's really interesting. Do you have any numbers that you could share.

I agree that there is a lot of tuning you can do to reduce the latency of
Cassandra. I guess my question is: Is Cassandra a good target for low
latency applications in the first place? It seems that distributed data
grids would be far preferable for such use cases. This is why we have
project "Fulgora" targeted at data grids as RAM is still an order magnitude
faster than Flash.

Thanks for the great discussion Matt.

On Wed, Sep 25, 2013 at 8:42 AM, Matt Pouttu-Clarke <
notifications@github.com> wrote:

I was wondering if you had tested with Flash (esp. PCI Flash like
Fusion-io). The heap pressure is mitigated by:

  • Cassandra 1.2 moving bloom filters and cache off-heap out of the JVM
  • PCI Flash is so fast that caching in the JVM has no benefit, so
    Cassandra key cache and row cache are unnecessary
  • Many low-latency apps would turn consistency to ONE or ANY which in
    that case the response comes back when the in-process Cassandra commit log
    responds, and does not wait for the other nodes
  • My application is so low latency that the serialization across even
    a localhost interface has major effects

In my use case I am even considering directly integrating with the Flash
API via JNI because Cassandra may be too slow even with all of the above.

So for my use case my latency requirement is such that I am considering
either in-process Cassandra or direct interface to Flash. I have tested
Cassandra directly on localhost interface and it is not fast enough,
although this test was not with Titan in the middle I have to assume that
injecting Titan will introduce more latency than I am willing to accept...


Reply to this email directly or view it on GitHubhttps://github.com//issues/380#issuecomment-25098591
.

Matthias Broecheler
http://www.matthiasb.com


Reply to this email directly or view it on GitHub #380 (comment) .Image removed by sender.

@mpouttuclarke
Copy link
Contributor Author

Here's my load test class, any comments? I am getting a lot of the "ID could not be recognized" errors on vertex read:
package com.earlywarning.rnd.titan;

import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;

import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanTransaction;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.ElementHelper;

public class GraphLoad {

/**
 * @param args
 */
public static void main(String[] args) throws Exception {
    BaseConfiguration config = new BaseConfiguration();
    Configuration storage = config
            .subset(GraphDatabaseConfiguration.STORAGE_NAMESPACE);
    storage.setProperty("cassandra-config-dir",
            String.format("file://%s", new File(args[0]).getAbsolutePath()));
    storage.setProperty(GraphDatabaseConfiguration.STORAGE_BACKEND_KEY,
            "embeddedcassandra");
    storage.setProperty("batch-loading", "true");
    config.setProperty("ids.block-size", Integer.toString(1024 * 1024 * 10));
    final TitanGraph graph = TitanFactory.open(config);
    graph.makeType().name("cid").dataType(Long.class).indexed(Vertex.class)
            .unique(Direction.BOTH).makePropertyKey();
    graph.makeType().name("fullName").dataType(String.class)
            .unique(Direction.OUT).makePropertyKey();

    graph.commit();

    final int keyCount = Integer.valueOf(args[1]);

    final AtomicLong writeTime = new AtomicLong();

    final Thread[] writeThreads = new Thread[Integer.valueOf(args[2])];
    for (int t = 0; t < writeThreads.length; t++) {
        final int threadId = t;
        writeThreads[t] = new Thread(new Runnable() {
            @Override
            public void run() {
                long start = System.currentTimeMillis();
                TitanTransaction tx = graph.newTransaction();
                long forThread = 0L;
                for (long cid = 1; cid < keyCount; cid++) {
                    if (cid % writeThreads.length != threadId) {
                        continue;
                    }
                    Vertex v = graph.addVertex(null);
                    ElementHelper.setProperties(v, "cid", cid, "fullName",
                            "jimbo");
                    if (++forThread % 10000 == 0) {
                        tx.commit();
                        tx = graph.newTransaction();
                    }
                }
                tx.commit();
                writeTime.addAndGet(System.currentTimeMillis() - start);
            }
        });
        writeThreads[t].start();
    }

    long delay = (long)((keyCount / 350000D) * 1000);
    System.out.println(String.format("Delaying readers by %.3f seconds", delay / 1000D));
    Thread.sleep(delay);

    final int readIterations = Integer.valueOf(args[3]);
    final AtomicInteger errors = new AtomicInteger();
    final AtomicInteger hits = new AtomicInteger();
    final AtomicLong readTime = new AtomicLong();

    Thread[] readThreads = new Thread[Integer.valueOf(args[4])];
    for (int t = 0; t < readThreads.length; t++) {
        readThreads[t] = new Thread(new Runnable() {
            @Override
            public void run() {
                long start = System.currentTimeMillis();
                TitanTransaction tx = graph.newTransaction();
                for (long x = 1; x < readIterations; x++) {
                    try {
                        Vertex v = tx.getVertex((long) Math.ceil(Math
                                .random() * keyCount));
                        if (v != null) {
                            hits.addAndGet(1);
                            v.getProperty("fullName");
                        }
                    } catch (Exception e) {
                        errors.incrementAndGet();
                    }
                }
                readTime.addAndGet(System.currentTimeMillis() - start);
            }
        });
        readThreads[t].start();
    }

    for (Thread thread : readThreads) {
        thread.join();
    }

    for (Thread thread : writeThreads) {
        thread.join();
    }

    // Cassandra shutdown may take a while when you hit it hard
    graph.shutdown();

    double readSecs = readTime.get() / 1000D / readThreads.length;
    int readTotal = readThreads.length * readIterations;
    System.out.println(String.format(
            "vertex read rate: hits = %s, errors = %s, %.3f / sec",
            hits.get(), errors.get(), readTotal / readSecs));

    double writeSecs = writeTime.get() / 1000D / writeThreads.length;
    System.out.println(String.format("vertex write rate: %.0f / sec",
            keyCount / writeSecs));
}

}

@mbroecheler
Copy link
Member

You are doing this:
@vertex v = tx.getVertex((long) Math.ceil(Math.random() * keyCount));@
where you make the assumption that Titan assigns ids sequentially which is not true.

@mpouttuclarke
Copy link
Contributor Author

How would I get the id that Titan assigned? I tried v.getId and it didn’t seem to work

From: Matthias Broecheler [mailto:notifications@github.com]
Sent: Wednesday, September 25, 2013 7:36 PM
To: thinkaurelius/titan
Cc: Pouttu-Clarke, Matt
Subject: Re: [titan] Delete of embedded Cassandra? (#380)

You are doing this:
@vertex https://github.com/Vertex v = tx.getVertex((long) Math.ceil(Math.random() * keyCount));@
where you make the assumption that Titan assigns ids sequentially which is not true.


Reply to this email directly or view it on GitHub #380 (comment) . https://github.com/notifications/beacon/80JKOe14q_cDcS5_VdQe2MQbPNww7EjgxrKRAsTEC-S3JsoiD0X8rYGhl7C6Ma04.gif

@vertex
Copy link

vertex commented Sep 26, 2013

@mbroecheler well, I never expected to be a line of code, but thank you.

@mbroecheler
Copy link
Member

@vertex haha, sweet username you got there.

@mbroecheler
Copy link
Member

@mpouttuclarke Titan assigns the ids after the transaction commits, at which point you can do v.getId() to get the id as an object (Tinkerpop compliant) or v.getID() to get the long id (Titan specific).

@mpouttuclarke
Copy link
Contributor Author

@mbroecheler There's a lot of objects hanging around in memory. Do you know a way to clear them out? I am committing every 10000 records.

$ /usr/share/jdk1.7.0_40/bin/jmap -histo:live 43165 | head

num #instances #bytes class name

1: 38454497 2153451832 com.thinkaurelius.titan.graphdb.relations.StandardProperty
2: 12819488 1023948504 [Ljava.lang.Object;
3: 25639474 820463168 java.util.HashMap$Entry
4: 12819073 615315504 java.util.HashMap
5: 12818168 512726720 com.thinkaurelius.titan.graphdb.vertices.StandardVertex
6: 12818775 477354432 [Ljava.util.HashMap$Entry;
7: 12818461 307643064 java.lang.Long

@mpouttuclarke
Copy link
Contributor Author

Actually from the heap dump the solution is quite simple, you have to clear the SimpleIndexCache and SimpleVertexCache on commit or rollback of transaction. These are the lines of code that need to be fixed com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx:
866

private void close() {

867

    //TODO: release non crucial data structures to preserve memory?

868

    isOpen=false;

869

}

@mpouttuclarke
Copy link
Contributor Author

Either that or you can make them LRU, but for my data sizes that would not matter much.

@mpouttuclarke
Copy link
Contributor Author

I can do fork and pull and verify the fix, if you want... I need the fix right away to continue load testing

@mbroecheler
Copy link
Member

@mpouttuclarke You are right, the transaction keeps pointers to all vertices in the transaction. We have an outstanding ticket #335 which is about using caches instead of normal maps so that memory can be released.

However, i don't understand why the transactional state keeps piling up. If you commit the transaction and release the pointer to it, then the garbage collector should clean things up. Or are you holding on to the vertex objects somewhere?

@mpouttuclarke
Copy link
Contributor Author

The code is above, so no, I'm not holding a reference. Unless tx = graph.newTransaction(); returns the same instance to the same thread.

@mpouttuclarke
Copy link
Contributor Author

Also, the thing is that the garbage collector may not be able to trace the vertex and property instances back to the holding object quickly enough. In that case explicitly freeing the objects helps the garbage collector free the nested objects sooner.

@mbroecheler
Copy link
Member

Ah, I was looking at the writing threads. Those commit regularly and you start a new transaction which should effectively release the memory. However, the reading threads don't and so you load more and more of the graph into each of the transactions which fills up your heap.
The long-running-read-only-transaction use case is exactly why #335 exists. I will take a crack at using guava cache for the vertex cache later tonight. The index cache already uses it.

@mpouttuclarke
Copy link
Contributor Author

@mbroecheler It even happens when I only spawn writer threads. Here is the result of the heap dump analysis for only writer threads:

So overall, this is resulting in 95% of heap allocation:
Class Name | Shallow Heap | Retained Heap | Percentage
com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx @ 0x6beb87d10 | 80 | 811,827,744 | 12.95%
|- com.thinkaurelius.titan.graphdb.transaction.indexcache.SimpleIndexCache @ 0x625b6c4d0 | 16 | 271,769,856 | 4.34%
|- com.thinkaurelius.titan.graphdb.transaction.vertexcache.SimpleVertexCache @ 0x629e7bc88 | 16 | 54,526,056 | 0.87%
|- com.thinkaurelius.titan.graphdb.transaction.addedrelations.SimpleBufferAddedRelations @ 0x625bb4520| 24 | 24,613,664 | 0.39%
|- com.google.common.cache.LocalCache$LocalManualCache @ 0x625b72970 | 16 | 928 | 0.00%
|- com.thinkaurelius.titan.diskstorage.BackendTransaction @ 0x62a934880 | 40 | 416 | 0.00%

@mpouttuclarke
Copy link
Contributor Author

@mbroecheler: so I will simply add code in close() to clear the caches in my fork, that should fix my immediate problem.

@mbroecheler
Copy link
Member

@mpouttuclarke Let me know if that helps. I still cannot make logical sense of that. When you do tx = graph.newTransaction(); the only pointer to the previous transaction is released. There does not seem to be any other reference to that transaction (or any element in it) inside the run() method. Hence, GC should be able to determine that it can release the transaction and all its associated memory. I agree that there might be some inefficiencies on the side of the GC, but when the heap pressure becomes high enough, a mark-and-sweep should clear this out.

@mbroecheler
Copy link
Member

@mpouttuclarke Couldn't let go and went ahead to implement an LRUVertexCache. Check out commit:
1749c4d

Added a test case that shows that memory is not leaking anymore:
https://github.com/thinkaurelius/titan/blob/master/titan-test/src/main/java/com/thinkaurelius/titan/graphdb/TitanGraphPerformanceMemoryTest.java#L73

@mpouttuclarke
Copy link
Contributor Author

Wow, thanks!

From: Matthias Broecheler [mailto:notifications@github.com]
Sent: Friday, September 27, 2013 12:14 AM
To: thinkaurelius/titan titan@noreply.github.com
Cc: Pouttu-Clarke, Matt
Subject: Re: [titan] Delete of embedded Cassandra? (#380)

@mpouttuclarkehttps://github.com/mpouttuclarke Couldn't let go and went ahead to implement an LRUVertexCache. Check out commit:
1749c4dhttps://github.com/thinkaurelius/titan/commit/1749c4d32409de3bd4619844a6d331afbff1a17b

Added a test case that shows that memory is not leaking anymore:
https://github.com/thinkaurelius/titan/blob/master/titan-test/src/main/java/com/thinkaurelius/titan/graphdb/TitanGraphPerformanceMemoryTest.java#L73


Reply to this email directly or view it on GitHubhttps://github.com//issues/380#issuecomment-25223567.

@fabienrousseau
Copy link

By doing a small change in CassandraEmbeddedKeyColumnValueStore.java#L341, you should have a performance improvement, unless read slices are very small.
This change consists in using cf.getSortedColumns() and, avoid calling cf.getColumn(col)

Why ?
ColumnFamily inherits AbstractColumnContainer, which has a ISortedColumns member.
This interface has multiple implementation. One of them is ArrayBackedSortedColumns
From documentation, it says : "This implementation makes sense when the main operations performed are iterating over the map and adding columns (especially if insertion is in sorted order)"

When reading from Cassandra, this implementation is used because:

  • reads are done in order using an iterator merging memtables & sstables,
  • thus insert are done in order,
  • then serialization is done by iterating over the collection.

In this case, because Cassandra is embedded, there is no serialization involved and the structure is used directly.
When calling "cf.getColumn(col)" there is a binary search involved for each call which is less efficient than iterating over the collection.

@mbroecheler
Copy link
Member

Created a new ticket #417
If you have a well thought feature suggestion or enhancement like this one, please create a new issue. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants