Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framework for sending emails #135

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions project/Project.scala
Expand Up @@ -97,6 +97,8 @@ object Zipkin extends Build {
"org.slf4j" % "slf4j-log4j12" % "1.6.4" % "runtime",
"javax.mail" % "mail" % "1.4.3",
"com.github.spullara.mustache.java" % "compiler" % "0.8.2",
"com.twitter" % "util-core" % UTIL_VERSION,
"com.twitter" % "util-logging" % UTIL_VERSION,
/* Test dependencies */
"org.scala-tools.testing" % "specs_2.9.1" % "1.6.9" % "test"
),
Expand Down
Expand Up @@ -18,6 +18,7 @@ package com.twitter.zipkin.hadoop

import collection.mutable
import collection.immutable.HashMap
import email.EmailContent
import java.util.Scanner
import java.io.File
import com.twitter.zipkin.hadoop.sources._
Expand All @@ -39,12 +40,11 @@ abstract class HadoopJobClient(val combineSimilarNames: Boolean) {
/**
* Starts the postprocessing for the client
* @param filename the input filename
* @param output the output filename
*/
def start(filename : String, output : String)

def getServiceName(service: String) = {
if (combineSimilarNames) HadoopJobClient.serviceNames(service) else service
service
}

def getLineResult(line: List[String]): LineResult = {
Expand Down Expand Up @@ -79,26 +79,5 @@ abstract class HadoopJobClient(val combineSimilarNames: Boolean) {
object HadoopJobClient {

val DELIMITER = ":"
var serviceNames = new HashMap[String, String]()

/**
* Given a directory of files formatted in TSV format with each line being of the form
* servicename standardizedservicename
* reads that information into a map
* @param dirname the name of a directory containing all the service name information
*/
def populateServiceNames(dirname: String) = {
Util.traverseFileTree(new File(dirname))({f: File =>
val s = new Scanner(f)
while (s.hasNextLine()) {
val line = new Scanner(s.nextLine())
val serviceName = Util.toSafeHtmlName(line.next())
val standardized = if (line.hasNext) line.next else serviceName
if (!serviceNames.contains(serviceName)) {
serviceNames += serviceName -> standardized
}
}
})
}

}
Expand Up @@ -16,6 +16,8 @@

package com.twitter.zipkin.hadoop

import sources.Util

/**
* This class represents a single line generated by a Hadoop job
* @param line the single line of data
Expand Down Expand Up @@ -97,7 +99,7 @@ class PerServicePairLineResult(line: List[String]) extends LineResult(line) {
}

def getKey() = {
line.head + HadoopJobClient.DELIMITER + line.tail.head
Util.toSafeHtmlName(line.head) + HadoopJobClient.DELIMITER + Util.toSafeHtmlName(line.tail.head)
}

def getValue() = {
Expand Down
Expand Up @@ -16,15 +16,17 @@

package com.twitter.zipkin.hadoop

import com.twitter.zipkin.hadoop.sources.Util
import email.EmailContent
import config.{MailConfig, WorstRuntimesPerTraceClientConfig}
import email.{Email, EmailContent}
import com.twitter.logging.Logger

/**
* Runs all the jobs which write to file on the input. The arguments are expected to be inputdirname outputdirname servicenamefile
* Runs all the jobs which write to file on the input, and sends those as emails.
* The arguments are expected to be inputdirname servicenamefile
*/
object PostprocessWriteToFile {

val jobList = List(("WorstRuntimesPerTrace", new WorstRuntimesPerTraceClient(Util.ZIPKIN_TRACE_URL)),
val jobList = List(("WorstRuntimesPerTrace", (new WorstRuntimesPerTraceClientConfig())()),
("Timeouts", new TimeoutsClient()),
("Retries", new RetriesClient()),
("MemcacheRequest", new MemcacheRequestClient()),
Expand All @@ -33,15 +35,25 @@ object PostprocessWriteToFile {

def main(args: Array[String]) {
val input = args(0)
val output = args(1)
val serviceNames = args(2)
val serviceNames = args(1)
val output = if (args.length < 3) null else args(2)

HadoopJobClient.populateServiceNames(serviceNames)
EmailContent.populateServiceNames(serviceNames)
for (jobTuple <- jobList) {
val (jobName, jobClient) = jobTuple
jobClient.start(input + "/" + jobName, output)
}
EmailContent.writeAll()
if (output != null) {
EmailContent.setOutputDir(output)
EmailContent.writeAll()
}
val serviceToEmail = EmailContent.writeAllAsStrings()
for (tuple <- serviceToEmail) {
val (service, content) = tuple
EmailContent.getEmailAddress(service) match {
case Some(addresses) => addresses.foreach {address => (new MailConfig())().send(new Email(address, "Service Report for " + service, content))}
}
}
}
}

Expand All @@ -53,7 +65,7 @@ object PostprocessWriteToFile {
object ProcessPopularKeys {
def main(args : Array[String]) {
val portNumber = augmentString(args(2)).toInt
HadoopJobClient.populateServiceNames(args(0))
EmailContent.populateServiceNames(args(0))
val c = new PopularKeyValuesClient(portNumber)
c.start(args(0), args(1))
}
Expand All @@ -66,7 +78,7 @@ object ProcessPopularKeys {
object ProcessPopularAnnotations {
def main(args : Array[String]) {
val portNumber = augmentString(args(2)).toInt
HadoopJobClient.populateServiceNames(args(0))
EmailContent.populateServiceNames(args(0))
val c = new PopularAnnotationsClient(portNumber)
c.start(args(0), args(1))
}
Expand All @@ -79,7 +91,7 @@ object ProcessPopularAnnotations {

object ProcessMemcacheRequest {
def main(args : Array[String]) {
HadoopJobClient.populateServiceNames(args(0))
EmailContent.populateServiceNames(args(0))
val c = new MemcacheRequestClient()
c.start(args(0), args(1))
EmailContent.writeAll()
Expand All @@ -93,7 +105,7 @@ object ProcessMemcacheRequest {

object ProcessTimeouts {
def main(args : Array[String]) {
HadoopJobClient.populateServiceNames(args(0))
EmailContent.populateServiceNames(args(0))
val c = new TimeoutsClient()
c.start(args(0), args(1))
EmailContent.writeAll()
Expand All @@ -108,7 +120,7 @@ object ProcessTimeouts {
object ProcessExpensiveEndpoints {

def main(args: Array[String]) {
HadoopJobClient.populateServiceNames(args(0))
EmailContent.populateServiceNames(args(0))
val c = new ExpensiveEndpointsClient()
c.start(args(0), args(1))
EmailContent.writeAll()
Expand All @@ -119,8 +131,8 @@ object ProcessExpensiveEndpoints {
object ProcessWorstRuntimesPerTrace {

def main(args: Array[String]) {
HadoopJobClient.populateServiceNames(args(0))
val c = new WorstRuntimesPerTraceClient(Util.ZIPKIN_TRACE_URL)
EmailContent.populateServiceNames(args(0))
val c = (new WorstRuntimesPerTraceClientConfig()).apply()
c.start(args(0), args(1))
EmailContent.writeAll()
}
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.twitter.zipkin.hadoop

import email.EmailContent
import java.io._
import collection.mutable.HashMap
import com.twitter.zipkin.hadoop.sources.Util
import email.EmailContent

/**
* A client which writes to a file. This is intended for use mainly to format emails
Expand Down Expand Up @@ -50,8 +50,8 @@ class MemcacheRequestClient extends WriteToFileClient(true, "MemcacheRequest") {
val valuesToInt = lines.map({ line: LineResult => augmentString(line.getValueAsString()).toInt })
valuesToInt.foldLeft(0) ((left: Int, right: Int) => left + right )
}
val mt = EmailContent.getTemplate(service, toHtmlName(service))
mt.addOneLineResult("Service " + service + " made " + numberMemcacheRequests + " redundant memcache requests")
val mt = EmailContent.getTemplate(service)
mt.addOneLineResult(service, "Service " + service + " made " + numberMemcacheRequests + " redundant memcache requests")
}

}
Expand All @@ -67,11 +67,11 @@ abstract class WriteToTableClient(jobname: String) extends WriteToFileClient(fal
def getTableHeader(): List[String]

def addTable(service: String, lines: List[LineResult], mt: EmailContent) = {
mt.addTableResult(getTableResultHeader(service), getTableHeader(), lines)
mt.addTableResult(service, getTableResultHeader(service), getTableHeader(), lines)
}

def processKey(service: String, lines: List[LineResult]) {
val mt = EmailContent.getTemplate(service, toHtmlName(service))
val mt = EmailContent.getTemplate(service)
addTable(service, lines, mt)
}
}
Expand Down Expand Up @@ -120,7 +120,7 @@ class WorstRuntimesClient extends WriteToTableClient("WorstRuntimes") {
}

def getTableHeader() = {
List("Span ID", "Duration")
List("Span ID", "Duration (ms)")
}
}

Expand All @@ -136,20 +136,28 @@ class WorstRuntimesPerTraceClient(zipkinUrl: String) extends WriteToTableClient(
}

def getTableHeader() = {
List("Trace ID", "Duration")
List("Trace ID", "Duration (ms)")
}

override def addTable(service: String, lines: List[LineResult], mt: EmailContent) = {
val formattedAsUrl = lines.map {line =>
val formattedAsUrl = lines.flatMap {line =>
if (line.getValue().length < 2) {
throw new IllegalArgumentException("Malformed line: " + line)
}
val hypertext = line.getValue().head
(Util.ZIPKIN_TRACE_URL + hypertext, hypertext, line)
val duration = augmentString(line.getValue().tail.head).toFloat
if (duration > 1000) {
val inMilliseconds = scala.math.round(duration * 100 / 1000.0) / 100.0
val formatted = new PerServiceLineResult((List(line.getKey(), hypertext, inMilliseconds.toString)))
Some((zipkinUrl + hypertext, hypertext, formatted))
} else {
None
}
}
if (formattedAsUrl.length > 0) {
mt.addUrlTableResult(service, getTableResultHeader(service), getTableHeader(), formattedAsUrl)
}
mt.addUrlTableResult(getTableResultHeader(service), getTableHeader(), formattedAsUrl)
}

}


Expand All @@ -164,6 +172,21 @@ class ExpensiveEndpointsClient extends WriteToTableClient("ExpensiveEndpoints")
}

def getTableHeader() = {
List("Service Called", "Duration")
List("Service Called", "Duration (ms)")
}

override def addTable(service: String, lines: List[LineResult], mt: EmailContent) = {
val formatted = lines.map {line =>
if (line.getValue().length < 2) {
throw new IllegalArgumentException("Malformed line: " + line)
}
val service = line.getValue().head
val duration = augmentString(line.getValue().tail.head).toFloat
val rounded = scala.math.round(duration * 100) / 100.0
new PerServiceLineResult((List(line.getKey(), service, rounded.toString)))
}
if (formatted.length > 0) {
mt.addTableResult(service, getTableResultHeader(service), getTableHeader(), formatted)
}
}
}
@@ -0,0 +1,43 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop.config

import com.twitter.util.Config
import com.twitter.zipkin.hadoop.email.SMTPClient

class MailConfig() extends Config[SMTPClient] {

// You'll need to change these
var smtpServer: String = "your.postmaster"
var smtpPort: Int = 0
var from: String = "zipkin-service-report@abc.efg"
var bcc: String = "zipkin-service-report@abc.efg"
var auth: Boolean = true
var user: String = "username"
var password: String = "password"

// All test emails are delivered to this address
var testTo: String = "zipkin-service-report-test@abc.efg"

def apply(testMode: Boolean): SMTPClient = {
new SMTPClient(from, testTo, bcc, testMode, smtpServer, smtpPort, auth, user, password)
}

def apply(): SMTPClient = {
apply(false)
}
}
@@ -0,0 +1,30 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop.config

import com.twitter.util.Config
import com.twitter.zipkin.hadoop.WorstRuntimesPerTraceClient

class WorstRuntimesPerTraceClientConfig extends Config[WorstRuntimesPerTraceClient] {

val zipkinTraceUrl = "your.zipkin.url/traces"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a var


def apply() = {
new WorstRuntimesPerTraceClient(zipkinTraceUrl)
}

}