Skip to content

Latest commit

 

History

History
98 lines (80 loc) · 3.08 KB

File metadata and controls

98 lines (80 loc) · 3.08 KB

flink-cassandra-keyspace-cluster

Apache License, Version 2.0, January 2004 Maven Central

An extension for flink cassandra connector that lets you specify default cassandra keyspace.


For flink version >= 1.6 you should not use it, because it does not work.
They added defaultKeyspace parameter for connector builder:

        CassandraSink.addSink(dataSource)
                //...
                .setDefaultKeyspace("Your default keyspace")
                //...
                .build();

The main adventages (for flink version < 1.6) of the KeyspaceClusterBuilder is that it allows you to use POJO (using mappers http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html) without defining constant keyspace, and you can get keyspace from properties.


Example:

public class Main {
    public void main(String[] args) {
        String keyspace = "keyspace_flink";
        if(args.length > 0) {
            keyspace = args[0];
        }

        final TypeCodec<LocalDateTime> localDateTimeCodec = null; //Own codec

        //...
        CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(
            Pojo.class, 
            new KeyspaceClusterBuilder(keyspace) {
                @Override
                protected Cluster.Builder filledBuilder(Cluster.Builder builder) {
                    return builder.addContactPoint("localhost");
                }
                @Override
                protected void configureCluster(Cluster cluster) {
                    cluster
                        .getConfiguration()
                        .getCodecRegistry()
                        .register(localDateTimeCodec);
                }
            }
        );
        //...
    }
}

@Table(name = "test")
public class Pojo implements Serializable {
    //...
}

In standard solution you have to define constant keyspace in annotation:

public class Main {
    public void main(String[] args) {
        //...

        final TypeCodec<LocalDateTime> localDateTimeCodec = null; //Own codec

        CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(
            Pojo.class, 
            new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                    Cluster cluster = builder.addContactPoint("localhost").build();
                    cluster
                        .getConfiguration()
                        .getCodecRegistry()
                        .register(localDateTimeCodec);
                    return cluster;
                }
            }
        );
        //...
    }
}

@Table(keyspace = "keyspace_flink", name = "test")
public class Pojo implements Serializable {
    //...
}