In [47]:
// the csv file is a direct ASCII export form
// Multi Channel DataManager for a given MEA.
// In MultiChannel DataManager, specify ASCII export
// to prepare the file for this notebook.
val dataPath = "data/2_MEA2_raw.csv"
val experimentName = "#2"
// ---- Get header as a string Array.
val header = spark
    .read.textFile(dataPath)
    .filter(_.contains("TimeStamp"))
    .flatMap(_.split(","))
    .collect
// ---- Load the data into a DataFrame
// with DataType as Int .
// DataFrame is the same as a Dataset with 
// import org.apache.spark.sql.Row
// as the rows.
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// Pre-define how the data layout looks like.
// This leads to skipping of the first lines
// in the raw csv that do not match to the schema,
// if the read command is combined with the option
// DROPMALFORMED.
val schema = StructType(for (e <- header) yield {StructField(e, IntegerType, true)})
// load the data.
lazy val data = spark.read
    .schema(schema)
    .option("mode", "DROPMALFORMED")
    .csv(dataPath)
// load the noise segments from #2 SVD score1
// load the data
val noiseDataPath = "data/#2_PC1_score_noise_segments/part-00000"
val noiseSegmentsSchema = StructType(
    StructField("Noise", DoubleType, true) :: 
    StructField("Index", IntegerType, true) :: Nil)
val noiseSegmentsRDD = spark.read
    .textFile(noiseDataPath)
    .map(_.replaceAllLiterally("(",""))
    .map(_.replaceAllLiterally(")",""))
    .map(_.split(","))
    .rdd
    .map(r => Row(r(0).toDouble, r(1).toInt))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val noiseSegmentsDF = sqlContext.createDataFrame(noiseSegmentsRDD, noiseSegmentsSchema)
val timeStampAndIndexSchema = StructType(
    StructField("TimeStamp [µs]", IntegerType, true) :: 
    StructField("Index", IntegerType, true) :: Nil)
val timeStampAndIndexRDD = data.select("TimeStamp [µs]")
    .rdd
    .zipWithIndex
    .map(r => Row(r._1.getAs[Int](0), r._2.toInt))
val timeStampAndIndexDF = sqlContext.createDataFrame(timeStampAndIndexRDD, timeStampAndIndexSchema)
val noiseSegmentsTimeStamps = timeStampAndIndexDF
    .join(noiseSegmentsDF, timeStampAndIndexDF("Index") === noiseSegmentsDF("Index"))
    .select("TimeStamp [µs]")



dataPath = data/2_MEA2_raw.csv
experimentName = #2
header = Array(TimeStamp [µs], 47 (ID=0) [pV], 48 (ID=1) [pV], 46 (ID=2) [pV], 45 (ID=3) [pV], 38 (ID=4) [pV], 37 (ID=5) [pV], 28 (ID=6) [pV], 36 (ID=7) [pV], 27 (ID=8) [pV], 17 (ID=9) [pV], 26 (ID=10) [pV], 16 (ID=11) [pV], 35 (ID=12) [pV], 25 (ID=13) [pV], Ref (ID=14) [pV], 14 (ID=15) [pV], 24 (ID=16) [pV], 34 (ID=17) [pV], 13 (ID=18) [pV], 23 (ID=19) [pV], 12 (ID=20) [pV], 22 (ID=21) [pV], 33 (ID=22) [pV], 21 (ID=23) [pV], 32 (ID=24) [pV], 31 (ID=25) [pV], 44 (ID=26) [pV], 43 (ID=27) [pV], 41 (ID=28) [pV], 42 (ID=29) [pV], 52 (ID=30) [pV], 51 (ID=31) [pV], 53 (ID=32) [pV], 54 (ID=33) [pV], 61 (ID=34) [pV], 62 (ID=35) [pV], 71 (ID=36) [pV], 63 (ID=37) [pV], 72 (ID=38) [pV], 82 (ID=39) [pV], 73 (ID=40) [p...




[TimeStamp [µs], 47 (ID=0) [pV], 48 (ID=1) [pV], 46 (ID=2) [pV], 45 (ID=3) [pV], 38 (ID=4) [pV], 37 (ID=5) [pV], 28 (ID=6) [pV], 36 (ID=7) [pV], 27 (ID=8) [pV], 17 (ID=9) [pV], 26 (ID=10) [pV], 16 (ID=11) [pV], 35 (ID=12) [pV], 25 (ID=13) [pV], Ref (ID=14) [pV], 14 (ID=15) [pV], 24 (ID=16) [pV], 34 (ID=17) [pV], 13 (ID=18) [pV], 23 (ID=19) [pV], 12 (ID=20) [pV], 22 (ID=21) [pV], 33 (ID=22) [pV], 21 (ID=23) [pV], 32 (ID=24) [pV], 31 (ID=25) [pV], 44 (ID=26) [pV], 43 (ID=27) [pV], 41 (ID=28) [pV], 42 (ID=29) [pV], 52 (ID=30) [pV], 51 (ID=31) [pV], 53 (ID=32) [pV], 54 (ID=33) [pV], 61 (ID=34) [pV], 62 (ID=35) [pV], 71 (ID=36) [pV], 63 (ID=37) [pV], 72 (ID=38) [pV], 82 (ID=39) [pV], 73 (ID=40) [pV], 83 (ID=41) [pV], 64 (ID=42) [pV], 74 (ID=43) [pV], 84 (ID=44) [pV], 85 (ID=45) [pV], 75 (ID=46) [pV], 65 (ID=47) [pV], 86 (ID=48) [pV], 76 (ID=49) [pV], 87 (ID=50) [pV], 77 (ID=51) [pV], 66 (ID=52) [pV], 78 (ID=53) [pV], 67 (ID=54) [pV], 68 (ID=55) [pV], 55 (ID=56) [pV], 56 (ID=57) [pV], 58 (ID

In [48]:
// extract the noise segments for all electrodes based on timestamps from PC1 score
val dataNoiseSegments = data
    .join(noiseSegmentsTimeStamps, "TimeStamp [µs]")

dataNoiseSegments = [TimeStamp [µs]: int, 47 (ID=0) [pV]: int ... 59 more fields]


[TimeStamp [µs]: int, 47 (ID=0) [pV]: int ... 59 more fields]

In [49]:
///*
// This is the saving of extracted noise segments.
// The saved file is later imported into the notebook
// "Construct PSD tresholds from noise segment of each electrode (Python)",
// where the final noise tresholds are made.
// saving noise segments to file v 2
val folderName = experimentName + "_all_noise_segments"
// now saving all noise segments as text file
dataNoiseSegments.coalesce(1).write.csv("data/" + folderName)
//*/



folderName = #2_all_noise_segments


#2_all_noise_segments