### reduce by key, sort 연산 수행

In [1]:
//--(단어, 1) 튜플 생성....
val tupleRDD = filteredRDD.map { x => (x, 1) } // ('spark', 1) 같은 리스트들의 모음이 
    
//--단어별 개수 카운팅....
val reducedRDD = tupleRDD.reduceByKey { (x, y) => x + y} 
// reduceByKey : key별로 {}안의 연산을 수행함. 'spark' 가 4 line이 있다고 할 때, 
// {('spark', 'spark') => 1 + 1} 의 결과값을 ('spark', 2) 로 return함. 이를 4 line이 모두 수행되도록 recursive 하게 동작하는 것.
// key를 group으로 하는 groupby 연산이라고 생각하면 됨. --> 그룹마다 {}안의 연산을 recursive하게 실행.

Name: Syntax Error.
Message: 
StackTrace: 

### sort의 두 가지 방법
- sortByKey API
- top API

In [2]:
// sortByKey API
val sortedRDD = swappedRDD.sortByKey(false) //--swappedRDD.sortByKey(false, 1)


// top API :
val secondValueOrdering = new Ordering[(String, Int)] { //--Ordering 객체 생성....
  override def compare(a: (String, Int), b: (String, Int)) = {
    a._2 compare b._2  //--(a._2 compare b._2) * -1    //--if((a._2 compare b._2) * -1 == 0) (a._1 compare b._1) * -1 else (a._2 compare b._2) * -1
  }
}

//--개수가 큰 순서로 10개만 로그 출력....
reducedRDD.top(10)(secondValueOrdering).foreach { x => println("word_count(10 only) : " + x) }  //--reducedRDD.top(10).foreach { x => println("word_count(10 only) : " + x) }


Name: Syntax Error.
Message: 
StackTrace: 

### [LAB] log analyzer

In [None]:
case class ApacheAccessLog(ipAddress: String, clientIdentd: String,
                                             userId: String, dateTime: String, method: String,
                                             endpoint: String, protocol: String,
                                             responseCode: Int, contentSize: Long) {

}

object ApacheAccessLog {
  val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  def parseLogLine(log: String): ApacheAccessLog = {
    val res = PATTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      throw new RuntimeException("Cannot parse log line: " + log)
    }
    val m = res.get
    ApacheAccessLog(m.group(1), m.group(2), m.group(3), m.group(4),
      m.group(5), m.group(6), m.group(7), m.group(8).toInt, m.group(9).toLong)
  }
}



/**
 * The following log statistics will be computed:
 *   1. The average, min, and max content size of responses returned from the server.
 *   2. A count of response code's returned. (top count 10)
 *   3. All IPAddresses that have accessed this server more than N times. (N == 10) (top count 10)
 *   4. The top endpoints requested by count. (top count 10)
 * 
 */
object LogAnalyzer {
  def main(args: Array[String]) {
    
    lab.common.config.Config.setHadoopHOME
    
    val logFile = "src/main/resources/apache.access.log"
    
    val sc = new SparkContext("local[2]", "LogAnalyzer")  //--local execution (run on eclipse)....
    val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache()  //--RDD[ApacheAccessLog].cache()
    accessLogs.setName("accessLogs")
    //--1. The average, min, and max content size of responses returned from the server.
    val contentSizes = accessLogs.map(log => log.contentSize).setName("contentSizes").cache()
    println("1. Content Size Avg: %s, Min: %s, Max: %s".format( // reduce, count 등은 모두 각각 action으로 실행됨. 
                                                                // 그래서 위에서 cache를 사용하면 효율적.
                                                                    contentSizes.reduce(_ + _) / contentSizes.count,
                                                                    contentSizes.min,
                                                                    contentSizes.max))
    val stats = contentSizes.stats // stats -> action. 여기서도 action이 실행됨.
    println("1. Content Size(by StatCounter) Mean: %s, Min: %s, Max: %s".format( // mean, min등은 실행되는 action이 아님.
                                                                                  stats.mean,
                                                                                  stats.min,
                                                                                  stats.max))

    //--2. A count of response code's returned. (top count 10)
    val responseCodeToCount = accessLogs.map(log => (log.responseCode, 1))
                                                    .reduceByKey(_ + _)
                                                    .sortBy(r => r._2, false)
                                                    .take(10)
    println(s"""2. Response code counts (top 10): ${responseCodeToCount.mkString("[", " , ", "]")}""")

    //--3. All IPAddresses that have accessed this server more than N times. (N == 10) (top count 10)
    val ipAddresses = accessLogs.map(log => (log.ipAddress, 1))
                                        .reduceByKey(_ + _)
                                        .filter(_._2 > 10)
                                        .sortBy(r => r._2, false)
                                        //.map(x => x._1)
                                        .map(_._1)
                                        .take(10)
    println(s"""3. IPAddresses > 10 times (top 10): ${ipAddresses.mkString("[", " , ", "]")}""")

    //--4. The top endpoints requested by count. (top count 10)
    val topEndpoints = accessLogs.map(log => (log.endpoint, 1))
                                        .reduceByKey(_ + _)
                                        .top(10)(OrderingUtils.SecondValueOrdering)
    println(s"""4. Top Endpoints (top 10): ${topEndpoints.mkString("[", " , ", "]")}""")
    
    while(true) {}  //--for debug....
    sc.stop()
  }
}
