# Correlation between a Model and Satellite data

This notebook shows how to calculate correlation between two matrices. Each matrix is created out a set of GeoTiffs for a series of years. Both matrices should have the same dimension.

For demonstration we will use from a model (spring-index) and from a satellite (AVHRR).

## Dependencies

In [1]:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

import geotrellis.proj4.CRS
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{SinglebandGeoTiff, _}
import geotrellis.raster.{CellType, DoubleArrayTile, Tile, UByteCellType}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io.{SequenceFile, _}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.sys.process._

## Mode of operation

Here the user can define the mode of operation.
* **rdd_offline_mode**: If false it means the notebook will create all data from scratch and store protected_extent and num_cols_rows into HDFS. Otherwise, these data structures are read from HDFS.

It is also possible to define which directory of GeoTiffs is to be used and on which **band** to run Kmeans. The options are
* **all** which are a multi-band (**8 bands**) GeoTiffs
* Or choose single band ones:
    0. Onset_Greenness_Increase
    1. Onset_Greenness_Maximum
    2. Onset_Greenness_Decrease
    3. Onset_Greenness_Minimum
    4. NBAR_EVI_Onset_Greenness_Minimum
    5. NBAR_EVI_Onset_Greenness_Maximum
    6. NBAR_EVI_Area
    7. Dynamics_QC

<span style="color:red">Note that when using a range **kemans offline mode** is not possible and it will be reset to **online mode**</span>.

### Mode of Operation setup
<a id='mode_of_operation_setup'></a>

In [2]:
var rdd_offline_mode = true
var matrix_offline_mode = true

//Using spring-index model
var model_path = "hdfs:///user/hadoop/spring-index/"
var model_dir = "BloomFinal"

//Using AVHRR Satellite data
var satellite_path = "hdfs:///user/hadoop/avhrr/"
var satellite_dir = "SOST"

var out_path = "hdfs:///user/emma/correlation/"
var band_num = 3

//Years between (inclusive) 1989 - 2014
var satellite_first_year = 1989
var satellite_last_year = 2014

//Years between (inclusive) 1980 - 2015
var model_first_year = 1989
var model_last_year = 2014

//Mask
val toBeMasked = true
val mask_path = "hdfs:///user/hadoop/usa_mask.tif"

val save_rdds = false
val save_matrix = false

rdd_offline_mode = true
matrix_offline_mode = true
model_path = hdfs:///user/hadoop/spring-index/
model_dir = BloomFinal
satellite_path = hdfs:///user/hadoop/avhrr/
satellite_dir = SOST
out_path = hdfs:///user/emma/correlation/
band_num = 3
satellite_first_year = 1989
satellite_last_year = 2014
model_first_year = 1989
model_last_year = 2014
toBeMasked = true
mask_path = hdfs:///user/hadoop/usa_mask.tif
save_rdds = false
save_matrix = false


false


<span style="color:red">DON'T MODIFY ANY PIECE OF CODE FROM HERE ON!!!</span>.


### Mode of operation validation

In [3]:
//Check offline modes
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

//Paths to store data structures for Offline runs
var mask_str = ""
if (toBeMasked)
  mask_str = "_mask"
var model_grid_path = out_path + model_dir + "_model_grid"
var satellite_grid_path = out_path + satellite_dir + "_satellite_grid"
var model_matrix_path = out_path + model_dir + "_model_matrix"
var satellite_matrix_path = out_path + satellite_dir + "_satellite_matrix"
var metadata_path = out_path + "/metadata"

val rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(metadata_path))
val matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(satellite_matrix_path))

if (rdd_offline_mode != rdd_offline_exists) {
  println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + rdd_offline_exists.toString())
  rdd_offline_mode = rdd_offline_exists
}

if (matrix_offline_mode != matrix_offline_exists) {
  println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + matrix_offline_exists.toString())
  matrix_offline_mode = matrix_offline_exists
}

var skip_rdd = false
if (matrix_offline_exists) {
    println("Since we have a matrix, the load of the grids RDD will be skipped!!!")
    skip_rdd = true
}

var corr_tif = out_path + "_" + satellite_dir + "_" + model_dir + ".tif"
var corr_tif_tmp = "/tmp/correlation_" + satellite_dir + "_" + model_dir + ".tif"

//Years
val satellite_years = 1980 to 2015
val model_years = 1989 to 2014

if (!satellite_years.contains(satellite_first_year) || !(satellite_years.contains(satellite_last_year))) {
  println("Invalid range of years for " + satellite_dir + ". I should be between " + satellite_first_year + " and " + satellite_last_year)
  System.exit(0)
}

if (!model_years.contains(model_first_year) || !(model_years.contains(model_last_year))) {
  println("Invalid range of years for " + model_dir + ". I should be between " + model_first_year + " and " + model_last_year)
  System.exit(0)
}

if ( ((satellite_last_year - model_first_year) > (model_last_year - model_first_year)) || ((satellite_last_year - model_first_year) > (model_last_year - model_first_year))) {
  println("The range of years for each data set should be of the same length.");
  System.exit(0)
}

var model_years_range = (model_years.indexOf(model_first_year), model_years.indexOf(model_last_year))
var satellite_years_range = (satellite_years.indexOf(satellite_first_year), satellite_years.indexOf(satellite_last_year))

//Global variables
var projected_extent = new ProjectedExtent(new Extent(0,0,0,0), CRS.fromName("EPSG:3857"))
var model_grids_RDD: RDD[Array[Double]] = sc.emptyRDD
var model_grids: RDD[Array[Double]] = sc.emptyRDD
var satellite_grids_RDD: RDD[Array[Double]] = sc.emptyRDD
var satellite_grids: RDD[Array[Double]] = sc.emptyRDD
var num_cols_rows :(Int, Int) = (0, 0)
var cellT :CellType = UByteCellType
var mask_tile0 :Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
var satellite_cells_size :Long = 0
var model_cells_size :Long = 0
var t0 : Long = 0
var t1 : Long = 0

Waiting for a Spark session to start...

"Load GeoTiffs" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Matrix" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false


conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-801984405_36, ugi=emma (auth:SIMPLE)]]
mask_str = _mask
model_grid_path = hdfs:///user/emma/correlation/BloomFinal_model_grid
satellite_grid_path = hdfs:///user/emma/correlation/SOST_satellite_grid
model_matrix_path = hdfs:///user/emma/correlation/BloomFinal_model_matrix
satellite_matrix_path = hdfs:///user/emma/correlation/SOST_satellite_matrix
metadata_path = hdfs:///user/emma/correlation//metadat...


hdfs:///user/emma/correlation//metadata

## Functions to (de)serialize any structure into Array[Byte]

In [4]:
def serialize(value: Any): Array[Byte] = {
    val out_stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val obj_out_stream = new ObjectOutputStream(out_stream)
    obj_out_stream.writeObject(value)
    obj_out_stream.close
    out_stream.toByteArray
}

def deserialize(bytes: Array[Byte]): Any = {
    val obj_in_stream = new ObjectInputStream(new ByteArrayInputStream(bytes))
    val value = obj_in_stream.readObject
    obj_in_stream.close
    value
}

serialize: (value: Any)Array[Byte]
deserialize: (bytes: Array[Byte])Any


## Load GeoTiffs

Using GeoTrellis all GeoTiffs of a directory will be loaded into a RDD. Using the RDD, we extract a grid from the first file to lated store the Kmeans cluster_IDS, we build an Index for populate such grid and we filter out here all NaN values.

In [10]:
t0 = System.nanoTime()

//Load Mask
if (!skip_rdd && toBeMasked) {
  val mask_tiles_RDD = sc.hadoopGeoTiffRDD(mask_path).values
  val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map{case (e,v) => (v,e)}
  mask_tile0 = (mask_tiles_withIndex.filter(m => m._1==0).filter(m => !m._1.isNaN).values.collect())(0)
}

//Local variables
val pattern: String = "tif"
val satellite_filepath: String = satellite_path + satellite_dir
val model_filepath: String = model_path + "/" + model_dir

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")

Elapsed time: 7064910432ns


t0 = 19128228445454
pattern = tif
satellite_filepath = hdfs:///user/hadoop/avhrr/SOST
model_filepath = hdfs:///user/hadoop/spring-index//BloomFinal
t1 = 19135293355886


19135293355886

### Satellite data

In [14]:
t0 = System.nanoTime()
if (!skip_rdd) {
    if (rdd_offline_mode) {
      satellite_grids_RDD = sc.objectFile(satellite_grid_path)
    } else {
      //Lets load MODIS Singleband GeoTiffs and return RDD just with the tiles.
      val satellite_geos_RDD = sc.hadoopGeoTiffRDD(satellite_filepath, pattern)
      val satellite_tiles_RDD = satellite_geos_RDD.values

      if (toBeMasked) {
        val mask_tile_broad :Broadcast[Tile] = sc.broadcast(mask_tile0)
        satellite_grids_RDD = satellite_tiles_RDD.map(m => m.localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble().filter(_ != -1000))
      } else {
        satellite_grids_RDD = satellite_tiles_RDD.map(m => m.toArrayDouble())
      }

      //Store in HDFS
      if (save_rdds) {
          satellite_grids_RDD.saveAsObjectFile(satellite_grid_path)
      }
    }
    val satellite_grids_withIndex = satellite_grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }

    //Filter out the range of years:
    satellite_grids = satellite_grids_withIndex.filterByRange(satellite_years_range._1, satellite_years_range._2).values

    var satellite_grid0_index: RDD[Double] = satellite_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)
    satellite_cells_size = satellite_grid0_index.count().toInt
    println("Number of cells is: " + satellite_cells_size)
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")

Number of cells is: 10115631                                                    
Elapsed time: 15363305923ns


t0 = 20281482863605
t1 = 20296846169528


20296846169528

### Model data

In [13]:
t0 = System.nanoTime()

if (!skip_rdd) {
    if (rdd_offline_mode) {
      model_grids_RDD = sc.objectFile(model_grid_path)
      val metadata = sc.sequenceFile(metadata_path, classOf[IntWritable], classOf[BytesWritable]).map(_._2.copyBytes()).collect()
      projected_extent = deserialize(metadata(0)).asInstanceOf[ProjectedExtent]
      num_cols_rows = (deserialize(metadata(1)).asInstanceOf[Int], deserialize(metadata(2)).asInstanceOf[Int])
      cellT = deserialize(metadata(3)).asInstanceOf[CellType]  
    } else {
      val model_geos_RDD = sc.hadoopMultibandGeoTiffRDD(model_filepath, pattern)
      val model_tiles_RDD = model_geos_RDD.values

      //Retrieve the number of cols and rows of the Tile's grid
      val tiles_withIndex = model_tiles_RDD.zipWithIndex().map{case (v,i) => (i,v)}
      val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)

      num_cols_rows = (tile0.cols, tile0.rows)
      cellT = tile0.cellType

      //Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
      val projected_extents_withIndex = model_geos_RDD.keys.zipWithIndex().map{case (e,v) => (v,e)}
      projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect())(0)

      val band_numB :Broadcast[Int] = sc.broadcast(band_num)
      if (toBeMasked) {
        val mask_tile_broad :Broadcast[Tile] = sc.broadcast(mask_tile0)
        model_grids_RDD = model_tiles_RDD.map(m => m.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble().filter(_ != -1000))
      } else {
        model_grids_RDD = model_tiles_RDD.map(m => m.band(band_numB.value).toArrayDouble())
      }

      //Store data in HDFS
      if (save_rdds) {
          model_grids_RDD.saveAsObjectFile(model_grid_path)

          val writer: SequenceFile.Writer = SequenceFile.createWriter(conf,
            Writer.file(metadata_path),
            Writer.keyClass(classOf[IntWritable]),
            Writer.valueClass(classOf[BytesWritable])
          )

          writer.append(new IntWritable(1), new BytesWritable(serialize(projected_extent)))
          writer.append(new IntWritable(2), new BytesWritable(serialize(num_cols_rows._1)))
          writer.append(new IntWritable(3), new BytesWritable(serialize(num_cols_rows._2)))
          writer.append(new IntWritable(4), new BytesWritable(serialize(cellT)))
          writer.hflush()
          writer.close()
      }
    }
    val model_grids_withIndex = model_grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }

    //Filter out the range of years:
    model_grids = model_grids_withIndex.filterByRange(model_years_range._1, model_years_range._2).values
    
    var model_grid0_index: RDD[Double] = model_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)
    model_cells_size = model_grid0_index.count().toInt
    println("Number of cells is: " + model_cells_size)
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")



t0 = 19675616707133
t1 = 20280040646969


20280040646969

In [16]:
val model_grids_withIndex = model_grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }

    //Filter out the range of years:
    model_grids = model_grids_withIndex.filterByRange(model_years_range._1, model_years_range._2).values
    
    var model_grid0_index: RDD[Double] = model_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)
    model_cells_size = model_grid0_index.count().toInt
    //model_cells_size = num_cols_rows._1 * num_cols_rows._2
    println("Number of cells is: " + model_cells_size)

Number of cells is: 10115631                                                    


model_grids_withIndex = MapPartitionsRDD[60] at map at <console>:62
model_grids = MapPartitionsRDD[62] at values at <console>:65
model_grid0_index = MapPartitionsRDD[65] at flatMap at <console>:67
model_cells_size = 10115631


10115631

## Matrix

We need to do a Matrix transpose to have clusters per cell and not per year. With a GeoTiff representing a single year, the loaded data looks liks this:
```
bands_RDD.map(s => Vectors.dense(s)).cache()

//The vectors are rows and therefore the matrix will look like this:
[
Vectors.dense(0.0, 1.0, 2.0),
Vectors.dense(3.0, 4.0, 5.0),
Vectors.dense(6.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.0)
]
```

To achieve that we convert the **RDD[Vector]** into a distributed Matrix, a [**CoordinateMatrix**](https://spark.apache.org/docs/latest/mllib-data-types.html#coordinatematrix), which as a **transpose** method.

### Satellite data

In [17]:
t0 = System.nanoTime()
//Global variables
var satellite_matrix: RDD[Vector] = sc.emptyRDD
val satellite_cells_sizeB = sc.broadcast(satellite_cells_size)
if (matrix_offline_mode) {
  satellite_matrix = sc.objectFile(satellite_matrix_path)
} else {
  val mat :RowMatrix = new RowMatrix(satellite_grids_RDD.map(m => m.zipWithIndex).map(m => m.filter(!_._1.isNaN)).map(m => Vectors.sparse(satellite_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1))))
  // Split the matrix into one number per line.
  val byColumnAndRow = mat.rows.zipWithIndex.map {
    case (row, rowIndex) => row.toArray.zipWithIndex.map {
      case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
    }
  }.flatMap(x => x)

  val matt: CoordinateMatrix = new CoordinateMatrix(byColumnAndRow)
  val matt_T = matt.transpose()

  satellite_matrix = matt_T.toIndexedRowMatrix().rows.sortBy(_.index).map(_.vector)
  if (save_matrix) {
      satellite_matrix.saveAsObjectFile(satellite_matrix_path)
  }
}

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")



t0 = 20804124974062
satellite_matrix = MapPartitionsRDD[83] at map at <console>:82
satellite_cells_sizeB = Broadcast(22)
t1 = 20978646735559


20978646735559

### Model Data

In [18]:
t0 = System.nanoTime()

//Global variables
var model_matrix: RDD[Vector] = sc.emptyRDD

val model_cells_sizeB = sc.broadcast(model_cells_size)
if (matrix_offline_mode) {
  model_matrix = sc.objectFile(model_matrix_path)
} else {
  val mat :RowMatrix = new RowMatrix(model_grids_RDD.map(m => m.zipWithIndex).map(m => m.filter(!_._1.isNaN)).map(m => Vectors.sparse(model_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1))))
  // Split the matrix into one number per line.
  val byColumnAndRow = mat.rows.zipWithIndex.map {
    case (row, rowIndex) => row.toArray.zipWithIndex.map {
      case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
    }
  }.flatMap(x => x)

  val matt: CoordinateMatrix = new CoordinateMatrix(byColumnAndRow)
  val matt_T = matt.transpose()

  model_matrix = matt_T.toIndexedRowMatrix().rows.sortBy(_.index).map(_.vector)
  
  if (save_matrix) {
      model_matrix.saveAsObjectFile(model_matrix_path)
  }
}

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")



t0 = 21238374595830
model_matrix = MapPartitionsRDD[101] at map at <console>:84
model_cells_sizeB = Broadcast(27)
t1 = 21818844113455


21818844113455

### Validation

In [19]:
var model_matrix_rows = model_matrix.count()
var satellite_matrix_rows = satellite_matrix.count()

if (model_matrix_rows != satellite_matrix_rows) {
  println("For correlation it is necessary to have a matrix with same number of rows and columns!!!")
  println("Model matrix has " + model_matrix_rows + " rows while satellite matrix has " + satellite_matrix_rows + " rows!!!")
  //System.exit(0)
}



model_matrix_rows = 10115631
satellite_matrix_rows = 10115631


10115631

## Correlation

In [None]:
val satellite = satellite_matrix.zipWithIndex().map{ case (v, i) => (i,v)}
val model = model_matrix.zipWithIndex().map{case (v,i) => (i, v)}

val numCells = satellite.count()
var corr_res :Array[Double] = Array[Double](numCells)
var cell = 0
while (cell < numCells) {
  val satellite_array = satellite.filter(_._1 == cell).flatMap(_._2.toArray)
  val model_array = model.filter(_._1 == cell).flatMap(_._2.toArray).repartition(satellite_array.getNumPartitions)
  corr_res(cell) = Statistics.corr(satellite_array, model_array, "pearson")
  cell += 1
}

[Stage 49:>                                                        (0 + 0) / 31]

## Build GeoTiff with Kmeans cluster_IDs

The Grid with the cluster IDs is stored in a SingleBand GeoTiff and uploaded to HDFS.

### Assign cluster ID to each grid cell and save the grid as SingleBand GeoTiff

To assign the clusterID to each grid cell it is necessary to get the indices of gird cells they belong to. The process is not straight forward because the ArrayDouble used for the creation of each dense Vector does not contain the NaN values, therefore there is not a direct between the indices in the Tile's grid and the ones in **kmeans_res** (kmeans result).

To join the two RDDS the knowledge was obtaing from a stackoverflow post on [how to perform basic joins of two rdd tables in spark using python](https://stackoverflow.com/questions/31257077/how-do-you-perform-basic-joins-of-two-rdd-tables-in-spark-using-python).

In [12]:
val corr_cells :Array[Double] = corr_res
val corr_cellsD = DoubleArrayTile(corr_cells, num_cols_rows._1, num_cols_rows._2)

val geoTif = new SinglebandGeoTiff(corr_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))

//Save to /tmp/
GeoTiffWriter.write(geoTif, corr_tif_tmp)

//Upload to HDFS
var cmd = "hadoop dfs -copyFromLocal -f " + corr_tif_tmp + " " + corr_tif
Process(cmd)!

//Remove from /tmp/
cmd = "rm -fr " + corr_tif_tmp
Process(cmd)!

Name: Unknown Error
Message: <console>:60: error: not found: value corr_res
       val corr_cells :Array[Double] = corr_res
                                       ^

StackTrace: 

# [Visualize results](plot_kmeans_clusters.ipynb)