In [None]:
import geotrellis.raster._
import geotrellis.proj4.CRS
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{SinglebandGeoTiff, _}
import geotrellis.raster.{CellType, DoubleArrayTile}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.input.PortableDataStream
import org.apache.spark.sql._
import java.time.{ZonedDateTime, LocalDateTime}
import java.time.format.DateTimeFormatter
import java.sql.Date
import org.apache.spark.sql.types.{DateType, IntegerType}
import java.util.Calendar
import java.time.temporal.ChronoUnit
import java.util.GregorianCalendar
import org.apache.spark.sql.functions._
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.util
import org.apache.spark.ml.linalg.VectorUDT

## Read Ground Data

In [None]:
var df = spark.read.format("csv").option("header", "true").load("file:///data/local/home/parrot/minio2/individual_phenometrics_data.csv")
df.printSchema()



//filter only the phenology markers indicating SOS
var df_filt = df.filter(  $"First_Yes_Year" > 1998 &&  $"First_Yes_Year" < 2018 &&  $"Kingdom" === "Plantae" && 
                        ($"Phenophase_Description".contains("First bloom") ||
                        $"Phenophase_Description".contains("Breaking leaf buds") ||
                        $"Phenophase_Description".contains("Flowers or flower buds") ||
                        $"Phenophase_Description".contains("First leaf (historic lilac/honeysuckle)")))



//select only the columns which we need for the ground comparison
var pheno_df  = df_filt.select("Latitude","Longitude","Species","Phenophase_Description","First_Yes_Year","First_Yes_Month","First_Yes_Day","First_Yes_Julian_Date")



pheno_df.groupBy("Species").count().orderBy(desc("count")).show()

//convert types
var pheno_rdd: RDD[(Double, Double,String, String, Integer, Integer,Integer, Double)] = pheno_df.rdd.map( r => (r.getString(0).toDouble, r.getString(1).toDouble, //lat.long
                                                                              
                                                                                                         r.getString(2),r.getString(3),  //Pheno_description
                                                                                r.getString(4).toInt, r.getString(5).toInt, r.getString(6).toInt,r.getString(7).toDouble))


var speciesType = "agrifolia"
var outputresults = "Evi_1_" + speciesType + ".csv"


## Get Extent of SOS Products

In [None]:
//Single band GeoTiff
var filepath = "hdfs:///user/hadoop/evi_new_template.tif"

//Since it is a single GeoTiff, it will be a RDD with a tile.
var geoTiff : RDD[(ProjectedExtent, Tile)] = sc.hadoopGeoTiffRDD(filepath)
var proExtents_RDD : RDD[ProjectedExtent] = geoTiff.keys
var extents_withIndex = proExtents_RDD.zipWithIndex().map{case (e,v) => (v,e)}
var projected_extent :ProjectedExtent = (extents_withIndex.filter(m => m._1 == 0).values.collect())(0)

var rasterExtent = RasterExtent(projected_extent.extent, 6500, 3000) 






## Extract TimeSat SOS Predictions

In [None]:
val pattern: String = "tif"
var filepath: String = "hdfs:///user/hadoop/NDVI_experiment_7_SOS"  

//give the years; HadoopGeoTiffRDD will preserve the file order from HDFS
val tiles_RDD = sc.hadoopGeoTiffRDD(filepath, pattern).values.zipWithIndex.map{case (v,e) => (e+1999,v)}


//year,array of values
var grids_RDD: RDD[(Long,Array[Double])] = tiles_RDD.map(m => ( m._1, m._2.toArrayDouble()))

var indexedValues: RDD[(Long,Array[(Int, Double)])] =  grids_RDD.map(m => (m._1, m._2.zipWithIndex.map{case (v,e) => (e,v)}))



var rdd_year_and_tuple : RDD[(Long, (Int, Double))]  = indexedValues.flatMap{case (year, array_list) => array_list.map(tuple => (year, tuple))}.filter( m => !m._2._2.isNaN && m._2._2 >0)

var time_sat_output =   rdd_year_and_tuple.map{case(year,(index,value)) => ((year.toInt,index.toLong),value)}

  

## Get Grid Coordinates For the Ground Measurements

In [None]:

def getGridCoordinates(lat:Double, long: Double, species: String, pheno_type: String, year: Integer, month: Integer, day:Integer, julianYear : Double): ((Int,Long), (Double,String))={

    var (col, row) = rasterExtent.mapToGrid(long, lat)
    
    var date_observations : Calendar = Calendar.getInstance()
    date_observations.set(year.toInt ,month.toInt-1 , day.toInt) 
    var day_of_year : Double = date_observations.get(Calendar.DAY_OF_YEAR)
    //cols * row + col
    var index =  (6500.toInt*(row.toInt) + (col.toInt)).toLong
    
    return ((year, index),(day_of_year,species))
}


var ground_observartions = pheno_rdd.map(m =>  getGridCoordinates(m._1, m._2, m._3,m._4,m._5,m._6, m._7,m._8)).filter( m => m._2._1 >=0 && m._2._1 <270)


## Count Unique Observations (Diffrent Years and diffrent sites for a year)

In [None]:
var joined= time_sat_output.join(ground_observartions).filter(m => (m._2._2._2 == speciesType))
var grouped_by_year_pixel = joined.groupByKey().map(rec => (rec._1, rec._2.toList))
var minvalues = grouped_by_year_pixel.map(m =>(m._1, m._2.min))

## Prepare RDDs

In [None]:

//drop the index and year, get TimeSat value, phenovalue,species
var compare   :RDD[(Double, Double)]= minvalues.map(m => (m._2._2._1, m._2._1))

//here filter on species,flatter the (timeSat,ground) values, make it to List of type (timeSatValue, Vector.danse(ground_value))
var oneSpecies = compare.map(m => (m._1, Vectors.dense(m._2)))

//RDD used for printing
var oneSpeciestoPrint = compare.map(m => (m._1, m._2))
oneSpeciestoPrint.take(10000).foreach(println)

## Calculate Mean Absolute Error

In [None]:
var abs = oneSpeciestoPrint.map(m => Math.abs(m._1 - m._2))
var total = abs.count
var reduce = abs.reduce((_ + _))

print (reduce/total)

## Fit SOS Observations and Predictions into a Regression Model

In [None]:

val df2 = spark.createDataFrame(oneSpecies).toDF("label","features")
val df3 = spark.createDataFrame(oneSpeciestoPrint).toDF("label","features")

val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
val lrModel : LinearRegressionModel = lr.fit(df2)


val trainingSummary = lrModel.summary
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
println(s"Mean Absolute error: ${trainingSummary.meanAbsoluteError}")
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")