# Setup

* export SPARK_HOME=...
* export PYSPARK_DRIVER_PYTHON=jupyter
* export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
* pyspark --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0,com.ibm.stocator:stocator:1.0.17

# Monitor ObjectStorage(s3)

In [None]:
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext

sqlContext = SQLContext(sc)


In [None]:
s3_endpoint="YOUR_ENDPOINT"
s3_access_key="YOUR_ACCESS_KEY"
s3_secret_key="YOUR_SECRET_KEY"

In [None]:
hconf = sc._jsc.hadoopConfiguration()

hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')
hconf.set('fs.cos.mycos.endpoint', s3_endpoint)
hconf.set('fs.cos.mycos.access.key', s3_access_key)
hconf.set('fs.cos.mycos.secret.key', s3_secret_key)
hconf.set('fs.cos.mycos.v2.signer.type', 'false')


In [None]:
cloudant_host = "YOUR_HOST"
cloudant_username = "YOUR_USER"
cloudant_password = "YOUR_PASSWORD"

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from collections import OrderedDict

def loadData (fileName):
    
    rdd=sc.textFile(fileName)
    temp = rdd.map(lambda k: k.split("=",1))
    fields = [StructField("name", StringType(), True),StructField("value", StringType(), True)]
    schema = StructType(fields)   
    df = sqlContext.createDataFrame(temp, schema) 
    df.show()
    return df

def collectData (rdd, names):
    
    dict = rdd.filter(lambda x: x['name'] in names).collectAsMap()
    dict = {str(k): str(v) for k, v in dict.items()}
    return dict  

def process(dir):
    
    print "process ", dir
    
    print "... load data"
    df = sqlContext.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'false')\
      .option('mode', 'DROPMALFORMED')\
      .option('inferSchema', 'true')\
      .load(dir+"transactions.dat")
    df.show()
    
    key = (concat(col("_c0"), lit("-"), col("_c1")))
    df_convert = df.withColumn('name', key).withColumn('value',col("_c2")).select("name","value") 
    
    df_env = loadData(dir+"environment.dat")
    df_load = loadData(dir+"workload.dat")
    
    print "... filter data"

    col_load = collectData(df_load.rdd,["db.driver","recordcount","operationcount"])
    col_env = collectData(df_env.rdd,["YCSB_THREAD_COUNT","YCSB_WORKLOAD","YCSB_OP","YCSB_DB","YCSB_NOTES","YCSB_RUN_DATE"])
    col_perf = collectData(df_convert.rdd, ["[OVERALL]- RunTime(ms)","[OVERALL]- Throughput(ops/sec)", \
                                        "[READ]- Operations", \
                                        "[READ]- AverageLatency(us)", \
                                        "[READ]- MinLatency(us)", \
                                        "[READ]- MaxLatency(us)", \
                                        "[READ]- 95thPercentileLatency(us)", \
                                        "[READ]- 99thPercentileLatency(us)", \
                                        "[READ]- Return=OK", \
                                        "[UPDATE]- Operations", \
                                        "[UPDATE]- AverageLatency(us)", \
                                        "[UPDATE]- MinLatency(us)", \
                                        "[UPDATE]- MaxLatency(us)", \
                                        "[UPDATE]- 95thPercentileLatency(us)", \
                                        "[UPDATE]- 99thPercentileLatency(us)", \
                                        "[UPDATE]- Return=OK" ])

    print "... aggregate data"

    col_all = dict(col_perf, **col_load)
    col_all = dict(col_all, **col_env)

    df_all = sc.parallelize([col_all]).map(lambda l: Row(**dict(l))).toDF()
    
    print "...transform data"

    df_converted = df_all.withColumn("YCSB_THREAD_COUNT", df_all["YCSB_THREAD_COUNT"].cast(IntegerType())) \
               .withColumn("YCSB_RUN_DATE", from_unixtime(unix_timestamp('YCSB_RUN_DATE', 'yyyy-MM-dd_HH:mm:ss')).cast(TimestampType())) \
               .withColumn("operationcount", df_all["operationcount"].cast(IntegerType())) \
               .withColumn("recordcount", df_all["recordcount"].cast(IntegerType())) \
               .withColumn("[OVERALL]- RunTime(ms)", df_all["[OVERALL]- RunTime(ms)"].cast(FloatType())).withColumnRenamed("[OVERALL]- RunTime(ms)", "Latency") \
               .withColumn("[OVERALL]- Throughput(ops/sec)", df_all["[OVERALL]- Throughput(ops/sec)"].cast(FloatType())).withColumnRenamed("[OVERALL]- Throughput(ops/sec)", "Throughput") \
               .withColumn("[READ]- 95thPercentileLatency(us)", df_all["[READ]- 95thPercentileLatency(us)"].cast(FloatType())).withColumnRenamed("[READ]- 95thPercentileLatency(us)", "Read_95th") \
               .withColumn("[READ]- 99thPercentileLatency(us)", df_all["[READ]- 99thPercentileLatency(us)"].cast(FloatType())).withColumnRenamed("[READ]- 99thPercentileLatency(us)", "Read_99th")  \
               .withColumn("[READ]- AverageLatency(us)", df_all["[READ]- AverageLatency(us)"].cast(FloatType())).withColumnRenamed("[READ]- AverageLatency(us)", "Read_Latency") \
               .withColumn("[READ]- MaxLatency(us)", df_all["[READ]- MaxLatency(us)"].cast(FloatType())).withColumnRenamed("[READ]- MaxLatency(us)", "Read_Max_Latency") \
               .withColumn("[READ]- MinLatency(us)", df_all["[READ]- MinLatency(us)"].cast(FloatType())).withColumnRenamed("[READ]- MinLatency(us)", "Read_Min_Latency") \
               .withColumn("[READ]- Operations", df_all["[READ]- Operations"].cast(IntegerType())).withColumnRenamed("[READ]- Operations", "Read_Operations") \
               .withColumn("[READ]- Return=OK", df_all["[READ]- Return=OK"].cast(IntegerType())).withColumnRenamed("[READ]- Return=OK", "Read_Operations_Success") \
               .withColumn("[UPDATE]- 95thPercentileLatency(us)", df_all["[UPDATE]- 95thPercentileLatency(us)"].cast(FloatType())).withColumnRenamed("[UPDATE]- 95thPercentileLatency(us)", "Update_95th") \
               .withColumn("[UPDATE]- 99thPercentileLatency(us)", df_all["[UPDATE]- 99thPercentileLatency(us)"].cast(FloatType())).withColumnRenamed("[UPDATE]- 99thPercentileLatency(us)", "Update_99th") \
               .withColumn("[UPDATE]- AverageLatency(us)", df_all["[UPDATE]- AverageLatency(us)"].cast(FloatType())).withColumnRenamed("[UPDATE]- AverageLatency(us)", "Update_Latency") \
               .withColumn("[UPDATE]- MaxLatency(us)", df_all["[UPDATE]- MaxLatency(us)"].cast(FloatType())).withColumnRenamed("[UPDATE]- MaxLatency(us)", "Update_Max_Latency") \
               .withColumn("[UPDATE]- MinLatency(us)", df_all["[UPDATE]- MinLatency(us)"].cast(FloatType())).withColumnRenamed("[UPDATE]- MinLatency(us)", "Update_Min_Latency") \
               .withColumn("[UPDATE]- Operations", df_all["[UPDATE]- Operations"].cast(IntegerType())).withColumnRenamed("[UPDATE]- Operations", "Update_Operations") \
               .withColumn("[UPDATE]- Return=OK", df_all["[UPDATE]- Return=OK"].cast(IntegerType())).withColumnRenamed("[UPDATE]- Return=OK", "Update_Operations_Success") 
   
    df_converted.printSchema()
    df_converted.show()
    
    print "... save data"
    df_converted.write.format("org.apache.bahir.cloudant") \
            .option("cloudant.host",cloudant_host) \
            .option("cloudant.username",cloudant_username) \
            .option("cloudant.password",cloudant_password) \
            .save("ycsb")
    

In [None]:
dirName = "cos://mycompose.mycos/mysql/output/"

In [None]:
def fileName(data):
    if data.isEmpty():
        return ""
    debug = data.toDebugString()
    lines = debug.split("\n")[2:] 
    for l in lines: 
        fullName=l.split()[1]
        fName = fullName.split("/")[-1]
        fDir = fullName[0:len(fullName)-len(fName)]
        print fDir
        print fName
        if (fName == "workload.dat"):
            process(fDir)

ssc = StreamingContext(sc, 1)
files = ssc.textFileStream(dirName)
files.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
    