In [None]:
import scala.util.matching.Regex
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

## Connect to Spark

In [None]:
val conf = new SparkConf().setAppName("LogAnalyzer").setMaster("spark://ea7571ae308d:7077")
val spark = SparkSession.builder().config(conf).getOrCreate();

## Read Data

In [None]:
val logs = spark.sparkContext.textFile("../data/sample.log")

In [None]:
logs.take(5).foreach(println)

## Convert data to a structured format

In [None]:
case class ServerLog(host: String, timestamp: String, url: String, code: String, bytes: Long)
object ServerLog {

  private val REGEX_PATTERN = """^(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\S+)""".r

  def parseLine(line: String): Either[String, ServerLog] = {
    line match {
      case REGEX_PATTERN(host, ts, req, code, b) =>
        val url = parseRequest(req)
        val bytes = toLong(b)
        Right(ServerLog(host, ts, url.getOrElse(req), code, bytes.getOrElse(0)))
      case _ => Left(line)
    }
  }

  private def parseRequest(url: String): Option[String] = {
    val i = url.indexOf("/")
    if (i > -1) {
      val subs = url.slice(i, url.length)
      return Some(subs.split(" ").head)
    }
    return None
  }

  private def toLong(b: String): Option[Long] = {
    try {
      Some(b.toLong)
    } catch {
      case e: NumberFormatException => None
    }
  }
}

In [None]:
val parsedLines = logs.map(ServerLog.parseLine _)

In [None]:
// Filter only successfully parsed lines
val serverLogs = parsedLines.filter(_.isRight).map(_.right.get)

In [None]:
val dataDS = serverLogs.toDS()

In [None]:
dataDS.cache()

In [None]:
dataDS.take(5).foreach(println)

## DATASET API

In [None]:
dataDS.show(5)

#### Distinct values

In [None]:
dataDS.select("code").distinct.show()

In [None]:
dataDS.selectExpr("count(distinct(host)) as unique_hosts").show()

#### Value Comparisons

In [None]:
dataDS.select("code", "url").where("code > 400").show(5)

#### Aggregation

In [None]:
dataDS.groupBy("code", "url").count().show()

## SQL 

In [None]:
dataDS.createTempView("logs")

In [None]:
spark.sql("select timestamp from logs limit 5").show()

#### TOP 4 URL's with NOT FOUND status

In [None]:
spark.sql("""SELECT url, count(*) AS count 
           FROM logs 
           WHERE code == 404 
           GROUP BY url 
           ORDER BY count DESC 
           LIMIT 4""").show()

## Compress to Parquet: DataWarehouse

In [None]:
val datetime = to_timestamp($"timestamp", "dd/MMM/yyyy:HH:mm:ss Z")

In [None]:
val exportDS = dataDS
                .withColumn("year", year(datetime))
                .withColumn("month", month(datetime))
                .withColumn("day", dayofmonth(datetime))

In [None]:
exportDS
    .write.mode("overwrite")
    .partitionBy("year", "month", "day")
    .parquet("../data/output/")