Skip to content

Crash: Lineage with and without [EPA air quality analysis and data]  #20

@miryung

Description

@miryung

By looking at the lines 41 and 44 you can comment out lineage or not. Basically, with the lineage on, the program crashes, while without the lineage on, it works fine. I attached the source code, outputs with and without, and sample data. Is it due to sortByKey?
sample-data.txt

AirQuality-WithLineage.txt
AirQuality-WithOutLineage.txt

package EPAairquality

import org.apache.spark.lineage.LineageContext
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime

import math.{atan2, cos, pow, sin, sqrt}

object AverageQuality {

def main(args: Array[String]) {

val sparkConf = new SparkConf()
var logFile = ""
var local = 0
if (args.length < 2) {
  sparkConf.setMaster("local[6]")
  sparkConf.setAppName("Weather Analysis").set("spark.executor.memory", "2g")
  logFile =
    "data-epa/sample-data.txt"
} else {

  logFile = args(0)
  local = args(1).toInt

}
// get threshold
val lat = 37.773972
var long = -122.43129
var radius = 500
var year_start = 2003
var year_end = 2016

var lineage = true
lineage = true
val ctx = new SparkContext(sparkConf)
val lc = new LineageContext(ctx)
lc.setCaptureLineage(lineage)

// read in text file and split each document into words
val csv = ctx.textFile(logFile)

// comment out to disable lineage
// val csv = lc.textFile(logFile)
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_ (0) != header(0))
val cols = data.map(row => Array[String](row(5), row(6), row(9), row(13)))
val time = cols.filter(year => DateTime.parse(year(2)).getYear().toInt <= year_end).filter(year => DateTime.parse(year(2)).getYear().toInt >= year_start)
val relevant = time.filter(location => {
  var a = pow(sin((location(0).toDouble - lat) / 2), 2) + cos(lat) * cos(location(0).toDouble) * pow(sin((location(1).toDouble - long) / 2), 2)
  var c = 2 * atan2(sqrt(a), sqrt(1 - a))
  var d = 6371e3 / 1609.34 * c
  if (d > radius) {
    false
  } else {
    true
  }
}
)
var kvpair = relevant.map(row => (DateTime.parse(row(2)).getYear().toInt * 100 + DateTime.parse(row(2)).getMonthOfYear().toInt, row(3).toDouble))
var month = kvpair.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1 / y._2)

var output = month.sortByKey().collect.foreach(line => {
  System.out.println("Year: " + "%d".format(line._1 / 100) + ", Month: " + "%02d".format(line._1 % 100) + ", Average Hourly PM10: " + "%02.2f".format(line._2) + ";")
})
ctx.stop()

}
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions