Skip to content

Commit

Permalink
BulkLoader: Allow UDTs to be loaded
Browse files Browse the repository at this point in the history
Since Cassandra validates whether an UDT exists within the keyspace
when it encounters its usage, be it nested in a tuple type or as
the type of a column in a table, we have to load them beforehand.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1460891914-23239-1-git-send-email-duarte@scylladb.com>
  • Loading branch information
duarten authored and avikivity committed May 2, 2016
1 parent 412d6ae commit 13b64ad
Showing 1 changed file with 86 additions and 0 deletions.
86 changes: 86 additions & 0 deletions src/java/com/scylladb/tools/BulkLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
Expand All @@ -61,6 +62,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -72,12 +74,19 @@
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.UTName;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand All @@ -90,6 +99,7 @@
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.DataType.Name.*;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
Expand All @@ -100,6 +110,8 @@
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.TokenRange;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.UserType;

public class BulkLoader {
public static class CmdLineOptions extends Options {
Expand Down Expand Up @@ -194,9 +206,83 @@ public CQLClient(LoaderOptions options, String keyspace)
session = cluster.connect(keyspace);
metadata = cluster.getMetadata();
keyspaceMetadata = metadata.getKeyspace(keyspace);
KSMetaData ksMetaData = KSMetaData.newKeyspace(
keyspaceMetadata.getName(),
"SimpleStrategy",
keyspaceMetadata.getReplication(),
keyspaceMetadata.isDurableWrites());
Schema.instance.load(ksMetaData);
loadUserTypes(keyspaceMetadata.getUserTypes(), ksMetaData);
partitioner = FBUtilities.newPartitioner(metadata.getPartitioner());
}

// Load user defined types. Since loading a UDT entails validation
// of the field types against known types, we may fail to load a UDT if
// it references a UDT that has not yet been loaded. So we run a
// fixed-point algorithm until we either load all UDTs or fail to make
// forward progress.
private static void loadUserTypes(Collection<UserType> udts, KSMetaData ks) {
LinkedList<UserType> notLoaded = new LinkedList<UserType>();
for (UserType ut : udts) {
try {
ArrayList<ByteBuffer> fieldNames = new ArrayList<ByteBuffer>(ut.getFieldNames().size());
ArrayList<AbstractType<?>> fieldTypes = new ArrayList<AbstractType<?>>();
for (UserType.Field f : ut) {
fieldNames.add(ByteBufferUtil.bytes(f.getName()));
fieldTypes.add(getCql3Type(f.getType()).prepare(ks.name).getType());
}
ks.userTypes.addType(new org.apache.cassandra.db.marshal.UserType(
ks.name, ByteBufferUtil.bytes(ut.getTypeName()), fieldNames, fieldTypes));
} catch (Exception e) {
System.out.println(e);
notLoaded.addFirst(ut);
}
}
if (notLoaded.size() == udts.size()) {
throw new RuntimeException("Unable to load user types " + notLoaded);
}
if (notLoaded.size() > 0) {
loadUserTypes(notLoaded, ks);
}
}

private static CQL3Type.Raw getCql3Type(DataType dt) throws Exception {
CQL3Type.Raw type;
switch (dt.getName()) {
case LIST:
type = CQL3Type.Raw.list(getCql3Type(dt.getTypeArguments().get(0)));
break;
case MAP:
type = CQL3Type.Raw.map(
getCql3Type(dt.getTypeArguments().get(0)),
getCql3Type(dt.getTypeArguments().get(1)));
break;
case SET:
type = CQL3Type.Raw.set(getCql3Type(dt.getTypeArguments().get(0)));
break;
case TUPLE:
ArrayList<CQL3Type.Raw> tupleTypes = new ArrayList<CQL3Type.Raw>();
for (DataType arg : ((TupleType)dt).getComponentTypes()) {
tupleTypes.add(getCql3Type(arg));
}
type = CQL3Type.Raw.tuple(tupleTypes);
break;
case UDT: // Requires this UDT to already be loaded
UserType udt = (UserType)dt;
type = CQL3Type.Raw.userType(new UTName(
new ColumnIdentifier(udt.getKeyspace(), true),
new ColumnIdentifier(udt.getTypeName(), true)));
break;
default:
type = CQL3Type.Raw.from(Enum.<CQL3Type.Native>valueOf(CQL3Type.Native.class, dt.getName().toString().toUpperCase()));
break;
}
if (dt.isFrozen()) {
type = CQL3Type.Raw.frozen(type);
}
return type;
}

@Override
public void finish() {
if (batchStatement != null
Expand Down

0 comments on commit 13b64ad

Please sign in to comment.