 # MVCC cause finder
 *This notebook uses the kotlin kernel (https://github.com/Kotlin/kotlin-jupyter) to process datasets with 
 large amounts of read conflicts.*
 Please export the dataframe from the python notebook so the csv can be read here.

In [None]:
%use dataframe
%use coroutines
import java.time.LocalDateTime

In [None]:
// dataframe containing all keyaccesses, sorted by txid, and access type, ascending
var df = DataFrame.readCSV("data/txsmvccs_pre.csv")
val txid by column<Int>()
val access_type by column<String>()
df = df.sortBy{ txid }
df = df.replace("untitled").with{(0..df.rowsCount()-1).toList().toColumn("untitled")}
df.schema()

In [None]:
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.Dispatchers.Default

val mutex = Mutex()
// list to store the background jobs in
val jobs = mutableListOf<Job>()
val start = System.currentTimeMillis()
val rowCount = df.rowsCount()
val percentile = 10
val threshold = rowCount.toDouble()/percentile.toDouble()
runBlocking{
    var k = 1
    // iterate over all rows(keyaccesses)
    for(i in 0 until(rowCount)){
        // print iteration progress
        if(i>threshold*k){
            val done = (percentile*k)
            print("$done%...")
            k++
        }
        val row = df[i]
        // if keyaccess' validation code is MVCC and its access type is READ
        if(row["validation_code"]=="MVCC_READ_CONFLICT" && row["access_type"]=="READ"){
            // save the key and its block version
            val current_key = row["key"]
            val current_versionBlock = row.version_block
            val current_versionTx = row.version_tx
            val current_txid = row.txid
            // start a new thread
            val job = CoroutineScope(Default).launch{
                // find all valid writes for the given key, in the interval between the keyaccess
                // and the block specified in the keyaccess' version
                val subset = df.filter {
                    txid < current_txid &&
                    (blockid > current_versionBlock || (blockid==current_versionBlock && current_versionTx<version_tx)) &&
                    current_key == key &&
                    access_type == "WRITE" &&
                    validation_code=="VALID"
                }
                // if successful mark last one as mvcc causing
                if(subset.rowsCount()>0){
                    val index = subset.last().untitled
                    mutex.withLock{
                        df = df.update{ mvcc_cause }.at(index).with { it+1 }
                        if(df[index].mvcc_caused_at == null){
                            df = df.update{ mvcc_caused_at }.at(index).with { "" + df[i].txid + ","}
                        }else {
                            df = df.update{ mvcc_caused_at }.at(index).with { it +df[i].txid + "," } 
                        }
                        // find out if any other key has been marked as mvcc causing in the transaction
                        val tx = df.filter{ txid == current_txid && mvcc_cause_found_for_tx == true }
                        // if not mark this row as mvcc causing
                        if(tx.rowsCount()==0){
                            df = df.update{ mvcc_cause_for_tx }.at(index).with { it+1 }
                            if(df[index].mvcc_caused_at_for_tx == null){
                                df = df.update{ mvcc_caused_at_for_tx }.at(index).with { "" + df[i].txid + ","}
                            }else {
                                df = df.update{ mvcc_caused_at_for_tx }.at(index).with { it +df[i].txid + "," } 
                            }
                            df = df.update { mvcc_cause_found_for_tx }.where { txid == current_txid }.withValue( true )
                        }
                    }
                }
            }
            // save the job so it can be waited for
            jobs.add(job)
        }
    }
    val done = k*percentile
    println("$done%")
    print("waiting for background threads...")
    // wait for all background jobs to complete
    jobs.joinAll()
}
val end = System.currentTimeMillis()
val diff = end-start
print("Operation took: $diff ms")
df.writeCSV("data/txsmvccs_post_no_gb.csv")
df

In [None]:
// group by keys and calculate the sum of mvccs they caused
var processed = df.groupBy{ key }.sum("mvccs_caused"){ mvcc_cause }
// sort by mvccs caused descending, only keep ones that caused more than 0
processed = processed.sortByDesc("mvccs_caused").filter { "mvccs_caused"<Int>() > 0 }
// write it to disk as csv
processed.writeCSV("data/txsmvccs_post.csv")
print("done!")

In [None]:
println("distinct caused at: " + df.mvcc_caused_at.countDistinct())
println("mvcc causes found: " + df.mvcc_cause.sum())