# Scala Example
## Insert through JDBC
Variables to connect to MariaDB ColumnStore throuhg JDBC are set and a SparkContext is initiated.

In [None]:
import java.util.Properties
import org.apache.spark.sql.SQLContext

val url = "jdbc:mysql://columnstore_host_nm:3306"

var connectionProperties = new Properties()
connectionProperties.put("user", "jupiter_user")
connectionProperties.put("password", "jupiter_pass")
connectionProperties.put("driver", "org.mariadb.jdbc.Driver")

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

A sample DataFrame is created, that shows numbers and their ASCII representation,

In [None]:
val sampleDF = sc.makeRDD(0 to 127).map(i => (i, i.toChar.toString)).toDF("number", "ASCII_representation")
sampleDF.printSchema()
sampleDF.registerTempTable("df")
sqlContext.sql("SELECT number, ASCII_representation from df WHERE number > 64 LIMIT 10").show()

 and is inserted into MariaDB CoumnStore through JDBC in database "test" table "scalaexample".

In [None]:
sampleDF.write.mode("overwrite").
  option("numPartitions", 1).
  option("createTableOptions", "ENGINE=columnstore").
  option("createTableColumnTypes", "number INT, ASCII_representation CHAR(1)").
  jdbc(url, "test.scalaexample", connectionProperties)

## Insert through native ColumnStore API (bulk insert)
Variables to connect to MariaDB ColumnStore through the native ColumnStore API are already set in /usr/local/mariadb/columnstore/etc/Columnstore.xml.

The necessary library is loaded, the ColumnStoreDriver is instantiated, and table "scalaexample" in database "test" is set for the bulk insert.

In [None]:
import com.mariadb.columnstore.api.ColumnStoreDriver

val d = new ColumnStoreDriver();
var b = d.createBulkInsert("test", "scalaexample", 0, 0);

Data is inserted into ColumnStore.

In [None]:
for (row <- sampleDF.collect()){
    b.setColumn(0, row.getInt(0))
    b.setColumn(1, row.getString(1))
    b.writeRow()
}
b.commit()

Finally, a summary of the insert process is shown.

In [None]:
val summary = b.getSummary()
println("Execution time: " + summary.getExecutionTime())
println("Rows inserted: " + summary.getRowsInsertedCount())
println("Truncation count: " + summary.getTruncationCount())
println("Saturated count: " + summary.getSaturatedCount())
println("Invalid count: " + summary.getInvalidCount())

## Insert through custom function and ColumnStore API (automatic type detection)
Requires an existing table with a corresponding schema.

In [None]:
import com.mariadb.columnstore.api.{ColumnStoreDriver,ColumnStoreDecimal,columnstore_data_types_t}
import org.apache.spark.sql.{Row,DataFrame}
import java.math.BigInteger

object ColumnStoreExporter {
  def export( database: String, table: String, df: DataFrame ) : Unit = {
    val rows = df.collect()
    val driver = new ColumnStoreDriver()
    val bulkInsert = driver.createBulkInsert(database, table, 0, 0)

    // get the column count of table
    val dbCatalog = driver.getSystemCatalog
    val dbTable = dbCatalog.getTable(database, table)
    val dbTableColumnCount = dbTable.getColumnCount

    // insert row by row into table
    try {
      for (row <- rows){
        for (columnId <- 0 until row.size){
          if (columnId < dbTableColumnCount){
            row.get(columnId) match {
              case input:Boolean => if (input) bulkInsert.setColumn(columnId, 1)
              else bulkInsert.setColumn(columnId, 0);
              case input:Byte => bulkInsert.setColumn(columnId, input)
              case input:java.sql.Date => bulkInsert.setColumn(columnId, input.toString)
              case input:java.math.BigDecimal =>
                val dbColumn = dbTable.getColumn(columnId)
                if (dbColumn.getType.equals(columnstore_data_types_t.DATA_TYPE_DECIMAL) ||
                  dbColumn.getType.equals(columnstore_data_types_t.DATA_TYPE_UDECIMAL) ||
                  dbColumn.getType.equals(columnstore_data_types_t.DATA_TYPE_FLOAT) ||
                  dbColumn.getType.equals(columnstore_data_types_t.DATA_TYPE_UFLOAT) ||
                  dbColumn.getType.equals(columnstore_data_types_t.DATA_TYPE_DOUBLE) ||
                  dbColumn.getType.equals(columnstore_data_types_t.DATA_TYPE_UDOUBLE)){
                  
                  bulkInsert.setColumn(columnId, new ColumnStoreDecimal(input.toPlainString))
                }
                else {
                  bulkInsert.setColumn(columnId, input.toBigInteger)
                }
              case input:Double => bulkInsert.setColumn(columnId, input)
              case input:Float => bulkInsert.setColumn(columnId, input)
              case input:Integer => bulkInsert.setColumn(columnId, input)
              case input:Long => bulkInsert.setColumn(columnId, input)
              case input:Short => bulkInsert.setColumn(columnId, input)
              case input:String => bulkInsert.setColumn(columnId, input)
              case input:java.sql.Timestamp => bulkInsert.setColumn(columnId, input.toString)
              case _ => throw new Exception("Parsing error, can't convert " +  row.get(columnId).getClass + ".")
            }
          }
        }
        bulkInsert.writeRow()
      }
      bulkInsert.commit()
    }
    catch {
      case e: Exception => bulkInsert.rollback(); e.printStackTrace();
    }
    finally{ // print a short summary of the insertion process
      val summary = bulkInsert.getSummary
      println("Execution time: " + summary.getExecutionTime)
      println("Rows inserted: " + summary.getRowsInsertedCount)
      println("Truncation count: " + summary.getTruncationCount)
      println("Saturated count: " + summary.getSaturatedCount)
      println("Invalid count: " + summary.getInvalidCount)
    }
  }
}

In [None]:
ColumnStoreExporter.export("test","scalaexample",sampleDF)