Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of github.com:twitter/zipkin into counting_spans2

  • Loading branch information...
commit 90bbea11ddfd4c01d0674be4426e75a59d824ff7 2 parents 0c8fc96 + 5065d5a
@johanoskarsson johanoskarsson authored
Showing with 1,908 additions and 426 deletions.
  1. +8 −6 project/Project.scala
  2. +10 −0 zipkin-common/src/main/scala/com/twitter/zipkin/common/AnnotationType.scala
  3. +29 −39 {zipkin-server → zipkin-common}/src/test/scala/com/twitter/zipkin/common/TraceSpec.scala
  4. +20 −1 zipkin-finatra/config/web-dev.scala
  5. +31 −0 zipkin-finatra/config/web-localhost.scala
  6. +2 −2 zipkin-finatra/src/main/resources/public/css/application.css
  7. +3 −2 zipkin-finatra/src/main/resources/public/js/application-index.js
  8. +1 −1  zipkin-finatra/src/main/resources/public/templates/query.mustache
  9. +6 −30 zipkin-finatra/src/main/resources/templates/layouts/application.mustache
  10. +31 −0 zipkin-finatra/src/main/scala/com/twitter/zipkin/config/CssConfig.scala
  11. +50 −0 zipkin-finatra/src/main/scala/com/twitter/zipkin/config/JsConfig.scala
  12. +31 −0 zipkin-finatra/src/main/scala/com/twitter/zipkin/config/StaticResourceConfig.scala
  13. +3 −0  zipkin-finatra/src/main/scala/com/twitter/zipkin/config/ZipkinWebConfig.scala
  14. +5 −3 zipkin-finatra/src/main/scala/com/twitter/zipkin/web/App.scala
  15. +19 −6 zipkin-finatra/src/main/scala/com/twitter/zipkin/web/ZipkinWeb.scala
  16. +3 −0  zipkin-finatra/src/scripts/finatra.sh
  17. +1 −1  zipkin-gems/zipkin-query/lib/zipkin-query/version.rb
  18. +61 −0 zipkin-gems/zipkin-query/vendor/gen-rb/zipkin-query/zipkin_query.rb
  19. +1 −1  zipkin-gems/zipkin-tracer/lib/zipkin-tracer.rb
  20. +1 −1  zipkin-gems/zipkin-tracer/lib/zipkin-tracer/version.rb
  21. +1 −1  zipkin-gems/zipkin-tracer/zipkin-tracer.gemspec
  22. +53 −0 zipkin-hadoop-job-runner/src/main/resources/email.mustache
  23. +104 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/HadoopJobClient.scala
  24. +106 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/LineResult.scala
  25. +54 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServiceClient.scala
  26. +34 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServicePairClient.scala
  27. +128 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/Postprocess.scala
  28. +0 −105 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala
  29. +169 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToFileClient.scala
  30. +46 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToServerClient.scala
  31. +165 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/email/EmailContent.scala
  32. +64 −0 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala
  33. +0 −31 zipkin-hadoop-job-runner/src/scripts/find_data.sh
  34. +118 −0 zipkin-hadoop-job-runner/src/scripts/has-sampled.rb
  35. +1 −0  zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala
  36. +2 −0  zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala
  37. +57 −0 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala
  38. +35 −0 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/GrepByAnnotation.scala
  39. +17 −19 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala
  40. +1 −0  zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala
  41. +7 −8 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala
  42. +55 −0 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala
  43. +0 −29 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala
  44. +31 −13 zipkin-hadoop/src/scripts/run_all_jobs.rb
  45. +24 −3 zipkin-hadoop/src/scripts/run_job.rb
  46. +1 −1  zipkin-hadoop/src/scripts/scald.rb
  47. +4 −4 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala
  48. +67 −0 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala
  49. +21 −14 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala
  50. +4 −4 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala
  51. +0 −22 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/UtilSpec.scala
  52. +71 −0 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala
  53. +12 −12 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala
  54. +1 −1  zipkin-scribe/src/scripts/collector.sh
  55. +14 −2 zipkin-scrooge/src/test/scala/com/twitter/zipkin/adapter/ThriftQueryAdapterSpec.scala
  56. +1 −2  zipkin-server/src/main/scala/com/twitter/zipkin/config/CassandraConfig.scala
  57. +21 −8 zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala
  58. +4 −8 zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala
  59. +46 −12 zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala
  60. +0 −3  zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/ScroogeThriftCodec.scala
  61. +19 −13 zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala
  62. +21 −15 zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraStorageSpec.scala
  63. +4 −0 zipkin-test/src/test/scala/com/twitter/zipkin/ZipkinSpec.scala
  64. +6 −0 zipkin-thrift/src/main/thrift/zipkinQuery.thrift
  65. +1 −1  zipkin-web/Gemfile
  66. +2 −2 zipkin-web/Gemfile.lock
View
14 project/Project.scala
@@ -7,10 +7,10 @@ import java.io.File
object Zipkin extends Build {
- val CASSIE_VERSION = "0.22.1"
- val FINAGLE_VERSION = "5.3.1"
- val OSTRICH_VERSION = "8.2.1"
- val UTIL_VERSION = "5.3.1"
+ val CASSIE_VERSION = "0.23.0"
+ val FINAGLE_VERSION = "5.3.5"
+ val OSTRICH_VERSION = "8.2.3"
+ val UTIL_VERSION = "5.3.6"
val proxyRepo = Option(System.getenv("SBT_PROXY_REPO"))
val travisCi = Option(System.getenv("SBT_TRAVIS_CI")) // for adding travis ci maven repos before others
@@ -83,7 +83,7 @@ object Zipkin extends Build {
).dependsOn(thrift)
lazy val hadoopjobrunner = Project(
- id = "zipkinhadoopjobrunner",
+ id = "zipkin-hadoop-job-runner",
base = file("zipkin-hadoop-job-runner"),
settings = Project.defaultSettings ++
StandardProject.newSettings ++
@@ -95,6 +95,8 @@ object Zipkin extends Build {
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-log4j12" % "1.6.4" % "runtime",
+ "javax.mail" % "mail" % "1.4.3",
+ "com.github.spullara.mustache.java" % "compiler" % "0.8.2",
/* Test dependencies */
"org.scala-tools.testing" % "specs_2.9.1" % "1.6.9" % "test"
),
@@ -263,7 +265,7 @@ object Zipkin extends Build {
resolvers += "codahale" at "http://repo.codahale.com",
libraryDependencies ++= Seq(
- "com.twitter" % "finatra" % "0.2.1",
+ "com.twitter" % "finatra" % "0.2.4",
"com.twitter.common.zookeeper" % "server-set" % "1.0.7",
View
10 zipkin-common/src/main/scala/com/twitter/zipkin/common/AnnotationType.scala
@@ -16,4 +16,14 @@
*/
package com.twitter.zipkin.common
+object AnnotationType {
+ case object Bool extends AnnotationType(0, "Bool")
+ case object Bytes extends AnnotationType(1, "Bytes")
+ case object I16 extends AnnotationType(2, "I16")
+ case object I32 extends AnnotationType(3, "I32")
+ case object I64 extends AnnotationType(4, "I64")
+ case object Double extends AnnotationType(5, "Double")
+ case object String extends AnnotationType(6, "String")
+}
+
case class AnnotationType(value: Int, name: String)
View
68 ...com/twitter/zipkin/common/TraceSpec.scala → ...com/twitter/zipkin/common/TraceSpec.scala
@@ -1,6 +1,6 @@
/*
* Copyright 2012 Twitter Inc.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -16,24 +16,23 @@
*/
package com.twitter.zipkin.common
-import org.specs.Specification
-import com.twitter.zipkin.gen
+import com.twitter.zipkin.Constants
+import com.twitter.zipkin.query.{Timespan, Trace, TraceSummary, SpanTreeEntry}
import collection.mutable
import java.nio.ByteBuffer
-import com.twitter.zipkin.query.{Timespan, Trace, TraceSummary, SpanTreeEntry}
-import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter}
+import org.specs.Specification
class TraceSpec extends Specification {
// TODO these don't actually make any sense
- val annotations1 = List(Annotation(100, gen.Constants.CLIENT_SEND, Some(Endpoint(123, 123, "service1"))),
- Annotation(150, gen.Constants.CLIENT_RECV, Some(Endpoint(456, 456, "service1"))))
- val annotations2 = List(Annotation(200, gen.Constants.CLIENT_SEND, Some(Endpoint(456, 456, "service2"))),
- Annotation(250, gen.Constants.CLIENT_RECV, Some(Endpoint(123, 123, "service2"))))
- val annotations3 = List(Annotation(300, gen.Constants.CLIENT_SEND, Some(Endpoint(456, 456, "service2"))),
- Annotation(350, gen.Constants.CLIENT_RECV, Some(Endpoint(666, 666, "service2"))))
- val annotations4 = List(Annotation(400, gen.Constants.CLIENT_SEND, Some(Endpoint(777, 777, "service3"))),
- Annotation(500, gen.Constants.CLIENT_RECV, Some(Endpoint(888, 888, "service3"))))
+ val annotations1 = List(Annotation(100, Constants.ClientSend, Some(Endpoint(123, 123, "service1"))),
+ Annotation(150, Constants.ClientRecv, Some(Endpoint(456, 456, "service1"))))
+ val annotations2 = List(Annotation(200, Constants.ClientSend, Some(Endpoint(456, 456, "service2"))),
+ Annotation(250, Constants.ClientRecv, Some(Endpoint(123, 123, "service2"))))
+ val annotations3 = List(Annotation(300, Constants.ClientSend, Some(Endpoint(456, 456, "service2"))),
+ Annotation(350, Constants.ClientRecv, Some(Endpoint(666, 666, "service2"))))
+ val annotations4 = List(Annotation(400, Constants.ClientSend, Some(Endpoint(777, 777, "service3"))),
+ Annotation(500, Constants.ClientRecv, Some(Endpoint(888, 888, "service3"))))
val span1Id = 666L
val span2Id = 777L
@@ -49,30 +48,21 @@ class TraceSpec extends Specification {
val trace = Trace(List[Span](span1, span2, span3, span4))
"Trace" should {
- "convert to thrift and back" in {
- val span = Span(12345, "methodcall", 666, None,
- List(Annotation(1, "boaoo", None)), Nil)
- val expectedTrace = Trace(List[Span](span))
- val thriftTrace = ThriftQueryAdapter(expectedTrace)
- val actualTrace = ThriftQueryAdapter(thriftTrace)
- expectedTrace mustEqual actualTrace
- }
-
"get duration of trace" in {
- val annotations = List(Annotation(100, gen.Constants.CLIENT_SEND, Some(Endpoint(123, 123, "service1"))),
- Annotation(200, gen.Constants.CLIENT_RECV, Some(Endpoint(123, 123, "service1"))))
+ val annotations = List(Annotation(100, Constants.ClientSend, Some(Endpoint(123, 123, "service1"))),
+ Annotation(200, Constants.ClientRecv, Some(Endpoint(123, 123, "service1"))))
val span = Span(12345, "methodcall", 666, None,
annotations, Nil)
100 mustEqual Trace(List(span)).duration
}
"get duration of trace without root span" in {
- val annotations = List(Annotation(100, gen.Constants.CLIENT_SEND, Some(Endpoint(123, 123, "service1"))),
- Annotation(200, gen.Constants.CLIENT_RECV, Some(Endpoint(123, 123, "service1"))))
+ val annotations = List(Annotation(100, Constants.ClientSend, Some(Endpoint(123, 123, "service1"))),
+ Annotation(200, Constants.ClientRecv, Some(Endpoint(123, 123, "service1"))))
val span = Span(12345, "methodcall", 666, Some(123),
annotations, Nil)
- val annotations2 = List(Annotation(150, gen.Constants.CLIENT_SEND, Some(Endpoint(123, 123, "service1"))),
- Annotation(160, gen.Constants.CLIENT_RECV, Some(Endpoint(123, 123, "service1"))))
+ val annotations2 = List(Annotation(150, Constants.ClientSend, Some(Endpoint(123, 123, "service1"))),
+ Annotation(160, Constants.ClientRecv, Some(Endpoint(123, 123, "service1"))))
val span2 = Span(12345, "methodcall", 666, Some(123),
annotations2, Nil)
100 mustEqual Trace(List(span, span2)).duration
@@ -120,9 +110,9 @@ class TraceSpec extends Specification {
}
"getBinaryAnnotations" in {
- val ba1 = BinaryAnnotation("key1", ByteBuffer.wrap("value1".getBytes), ThriftAdapter(gen.AnnotationType.String), None)
+ val ba1 = BinaryAnnotation("key1", ByteBuffer.wrap("value1".getBytes), AnnotationType.String, None)
val span1 = Span(1L, "", 1L, None, List(), List(ba1))
- val ba2 = BinaryAnnotation("key2", ByteBuffer.wrap("value2".getBytes), ThriftAdapter(gen.AnnotationType.String), None)
+ val ba2 = BinaryAnnotation("key2", ByteBuffer.wrap("value2".getBytes), AnnotationType.String, None)
val span2 = Span(1L, "", 2L, None, List(), List(ba2))
val trace = Trace(List[Span](span1, span2))
@@ -159,17 +149,17 @@ class TraceSpec extends Specification {
}
"merge spans" in {
- val ann1 = List(Annotation(100, gen.Constants.CLIENT_SEND, Some(Endpoint(123, 123, "service1"))),
- Annotation(300, gen.Constants.CLIENT_RECV, Some(Endpoint(123, 123, "service1"))))
- val ann2 = List(Annotation(150, gen.Constants.SERVER_RECV, Some(Endpoint(456, 456, "service2"))),
- Annotation(200, gen.Constants.SERVER_SEND, Some(Endpoint(456, 456, "service2"))))
+ val ann1 = List(Annotation(100, Constants.ClientSend, Some(Endpoint(123, 123, "service1"))),
+ Annotation(300, Constants.ClientRecv, Some(Endpoint(123, 123, "service1"))))
+ val ann2 = List(Annotation(150, Constants.ServerRecv, Some(Endpoint(456, 456, "service2"))),
+ Annotation(200, Constants.ServerSend, Some(Endpoint(456, 456, "service2"))))
val annMerged = List(
- Annotation(100, gen.Constants.CLIENT_SEND, Some(Endpoint(123, 123, "service1"))),
- Annotation(300, gen.Constants.CLIENT_RECV, Some(Endpoint(123, 123, "service1"))),
- Annotation(150, gen.Constants.SERVER_RECV, Some(Endpoint(456, 456, "service2"))),
- Annotation(200, gen.Constants.SERVER_SEND, Some(Endpoint(456, 456, "service2")))
- )
+ Annotation(100, Constants.ClientSend, Some(Endpoint(123, 123, "service1"))),
+ Annotation(300, Constants.ClientRecv, Some(Endpoint(123, 123, "service1"))),
+ Annotation(150, Constants.ServerRecv, Some(Endpoint(456, 456, "service2"))),
+ Annotation(200, Constants.ServerSend, Some(Endpoint(456, 456, "service2")))
+ )
val spanToMerge1 = Span(12345, "methodcall2", span2Id, Some(span1Id), ann1, Nil)
val spanToMerge2 = Span(12345, "methodcall2", span2Id, Some(span1Id), ann2, Nil)
View
21 zipkin-finatra/config/web-dev.scala
@@ -14,13 +14,32 @@
* limitations under the License.
*/
-import com.twitter.zipkin.config.ZipkinWebConfig
+import com.twitter.zipkin.config.{CssConfig, JsConfig, ZipkinWebConfig}
import com.twitter.zipkin.config.zookeeper.ZooKeeperConfig
import java.net.InetSocketAddress
new ZipkinWebConfig {
rootUrl = "http://localhost:" + serverPort + "/"
+ /**
+ * Making changes to js/css can be painful with a packaged jar since a compilation is needed to
+ * repackage any new changes.
+ * A simple hack is to stand up a simple Python HTTP server and point `resourcePathPrefix` it.
+ * Example:
+ *
+ * `cd zipkin-finatra/src/main/resources/public && python -m SimpleHTTPServer`
+ *
+ * Then, set:
+ * `val resourcePathPrefix = "http://localhost:8000"`
+ */
+ val resourcePathPrefix = "/public"
+ jsConfig = new JsConfig {
+ override val pathPrefix = resourcePathPrefix
+ }
+ cssConfig = new CssConfig {
+ override val pathPrefix = resourcePathPrefix
+ }
+
def zkConfig = new ZooKeeperConfig {
servers = List("localhost:3003")
}
View
31 zipkin-finatra/config/web-localhost.scala
@@ -0,0 +1,31 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import com.twitter.zipkin.config.ZipkinWebConfig
+import com.twitter.zipkin.config.zookeeper.ZooKeeperConfig
+import java.net.InetSocketAddress
+
+new ZipkinWebConfig {
+ // Change the hostname below to allow the Zipkin JS code to talk to the Zipkin API Scala code
+ // Suspect this should be marked as a bug really...
+ rootUrl = "http://localhost:" + serverPort + "/"
+
+ def zkConfig = new ZooKeeperConfig {
+ servers = List("localhost:2181")
+ }
+
+}
+
View
4 zipkin-finatra/src/main/resources/public/css/application.css
@@ -426,11 +426,11 @@ ul.traces li .trace-details {
}
ul.traces li .services {
- width: 79.7%;
+ width: 80%;
}
ul.traces li .timestamp {
- width: 20%;
+ width: 19%;
text-align: right;
}
View
5 zipkin-finatra/src/main/resources/public/js/application-index.js
@@ -227,9 +227,9 @@ Zipkin.Application.Index = (function() {
data.sort(function(a, b) {
if (sortOrder == ORDER_TIMESTAMP_ASC) {
- return new Date(a.start_time) - new Date(b.start_time);
+ return new Date(a.startTimestamp) - new Date(b.startTimestamp);
} else if (sortOrder == ORDER_TIMESTAMP_DESC) {
- return new Date(b.start_time) - new Date(a.start_time);
+ return new Date(b.startTimestamp) - new Date(a.startTimestamp);
} else if (sortOrder == ORDER_DURATION_ASC) {
return a.duration - b.duration;
} else {
@@ -419,6 +419,7 @@ Zipkin.Application.Index = (function() {
return { name: key, count: count };
});
e.url = root_url + "show/" + e.traceId;
+ e.startTime = Zipkin.Util.timeAgoInWords(e.startTimestamp / 1000);
return e;
});
traces = updateFilteredServices(traces);
View
2  zipkin-finatra/src/main/resources/public/templates/query.mustache
@@ -12,7 +12,7 @@
</span>
{{/serviceCounts}}
</div>
- <div class="trace-details timestamp">
+ <div class="trace-details timestamp pull-right">
<span class="label">{{startTime}}</span>
</div>
</a>
View
36 zipkin-finatra/src/main/resources/templates/layouts/application.mustache
@@ -1,38 +1,14 @@
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
<head>
- <link href="https://ajax.googleapis.com/ajax/libs/jqueryui/1.8/themes/ui-lightness/jquery-ui.css" media="screen" rel="stylesheet" />
- <link href="/public/css/bootstrap.css" media="screen" rel="stylesheet" />
- <link href="/public/css/bootstrap-responsive.css" media="screen" rel="stylesheet" />
- <link href="/public/css/datepicker.css" media="screen" rel="stylesheet" />
- <link href="/public/css/application.css" media="screen" rel="stylesheet" />
+ {{#stylesheets}}
+ <link href="{{.}}" media="screen" rel="stylesheet" />
+ {{/stylesheets}}
- <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js"></script>
- <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jqueryui/1.8.18/jquery-ui.min.js"></script>
+ {{#javascripts}}
+ <script type="text/javascript" src="{{.}}"></script>
+ {{/javascripts}}
- <script type="text/javascript" src="/public/js/bootstrap.js"></script>
- <script type="text/javascript" src="/public/js/datepicker.js"></script>
- <script type="text/javascript" src="/public/js/d3-2.9.1.js"></script>
- <script type="text/javascript" src="/public/js/hogan-2.0.0.js"></script>
-
- <script type="text/javascript" src="/public/js/zipkin.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-node.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-span.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-tree.js"></script>
-
- <script type="text/javascript" src="/public/js/zipkin-annotation.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-config.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-filter-span.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-kv-annotation.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-lazy-tree.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-onebox.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-trace-dependency.js"></script>
- <script type="text/javascript" src="/public/js/zipkin-trace-summary.js"></script>
-
- <script type="text/javascript" src="/public/js/application.js"></script>
- <script type="text/javascript" src="/public/js/application-index.js"></script>
- <script type="text/javascript" src="/public/js/application-show.js"></script>
- <script type="text/javascript" src="/public/js/application-static.js"></script>
<script>
var root_url="{{rootUrl}}";
</script>
View
31 zipkin-finatra/src/main/scala/com/twitter/zipkin/config/CssConfig.scala
@@ -0,0 +1,31 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package com.twitter.zipkin.config
+
+class CssConfig extends StaticResourceConfig {
+ val resourceType = "css"
+
+ val remoteResources = Seq(
+ "https://ajax.googleapis.com/ajax/libs/jqueryui/1.8/themes/ui-lightness/jquery-ui.css"
+ )
+
+ val localResources = Seq(
+ "bootstrap.css",
+ "bootstrap-responsive.css",
+ "datepicker.css",
+ "application.css"
+ )
+}
View
50 zipkin-finatra/src/main/scala/com/twitter/zipkin/config/JsConfig.scala
@@ -0,0 +1,50 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package com.twitter.zipkin.config
+
+class JsConfig extends StaticResourceConfig {
+ val resourceType = "js"
+
+ lazy val remoteResources = Seq(
+ "https://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js",
+ "https://ajax.googleapis.com/ajax/libs/jqueryui/1.8.18/jquery-ui.min.js"
+ )
+
+ lazy val localResources = Seq(
+ "bootstrap.js",
+ "datepicker.js",
+ "d3-2.9.1.js",
+ "hogan-2.0.0.js",
+
+ "zipkin.js",
+ "zipkin-node.js",
+ "zipkin-span.js",
+ "zipkin-tree.js",
+ "zipkin-annotation.js",
+ "zipkin-config.js",
+ "zipkin-filter-span.js",
+ "zipkin-kv-annotation.js",
+ "zipkin-lazy-tree.js",
+ "zipkin-onebox.js",
+ "zipkin-trace-dependency.js",
+ "zipkin-trace-summary.js",
+
+ "application.js",
+ "application-index.js",
+ "application-show.js",
+ "application-static.js"
+ )
+}
View
31 zipkin-finatra/src/main/scala/com/twitter/zipkin/config/StaticResourceConfig.scala
@@ -0,0 +1,31 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package com.twitter.zipkin.config
+
+trait StaticResourceConfig {
+ val pathPrefix: String = "/public"
+
+ val resourceType: String
+
+ val remoteResources: Seq[String]
+
+ val localResources: Seq[String]
+
+ lazy val resources = remoteResources ++
+ localResources.map { r =>
+ "%s/%s/%s".format(pathPrefix, resourceType, r)
+ }
+}
View
3  zipkin-finatra/src/main/scala/com/twitter/zipkin/config/ZipkinWebConfig.scala
@@ -29,6 +29,9 @@ trait ZipkinWebConfig extends ZipkinConfig[ZipkinWeb] {
"templates" -> "text/plain"
)
+ var jsConfig = new JsConfig
+ var cssConfig = new CssConfig
+
def zkConfig: ZooKeeperConfig
def zkClientConfig = new ZooKeeperClientConfig {
View
8 zipkin-finatra/src/main/scala/com/twitter/zipkin/web/App.scala
@@ -19,7 +19,7 @@ package com.twitter.zipkin.web
import com.twitter.finatra.{Response, Controller, View, Request}
import com.twitter.logging.Logger
import com.twitter.util.Future
-import com.twitter.zipkin.adapter.{JsonQueryAdapter, JsonAdapter, ThriftQueryAdapter, ThriftAdapter}
+import com.twitter.zipkin.adapter.{JsonQueryAdapter, ThriftQueryAdapter}
import com.twitter.zipkin.gen
import com.twitter.zipkin.config.ZipkinWebConfig
import java.nio.ByteBuffer
@@ -31,7 +31,7 @@ import java.util.Calendar
* @param config ZipkinWebConfig
* @param client Thrift client to ZipkinQuery
*/
-class App(config: ZipkinWebConfig, client: gen.ZipkinQuery.FinagledClient) extends Controller {
+class App(config: ZipkinWebConfig, client: gen.ZipkinQuery.FinagledClient) extends Controller(config.statsReceiver) {
val log = Logger.get()
val dateFormat = new SimpleDateFormat("MM-dd-yyyy")
@@ -100,7 +100,7 @@ class App(config: ZipkinWebConfig, client: gen.ZipkinQuery.FinagledClient) exten
}
}
}
- }.map(render.json(_))
+ }.flatten.map(render.json(_))
}
/**
@@ -262,6 +262,8 @@ class App(config: ZipkinWebConfig, client: gen.ZipkinQuery.FinagledClient) exten
val template = "templates/layouts/application.mustache"
val rootUrl = config.rootUrl
val innerView: View = v
+ val javascripts = config.jsConfig.resources
+ val stylesheets = config.cssConfig.resources
lazy val body = innerView.render
}
}
View
25 zipkin-finatra/src/main/scala/com/twitter/zipkin/web/ZipkinWeb.scala
@@ -5,19 +5,28 @@ import com.twitter.finagle.http.Http
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder, Server}
import com.twitter.finagle.thrift.ThriftClientFramedCodec
import com.twitter.finagle.zookeeper.ZookeeperServerSetCluster
-import com.twitter.finatra.{AppService, Controller, FinatraServer}
-import com.twitter.ostrich.admin.{ServiceTracker, Service}
+import com.twitter.finatra_core.{AbstractFinatraController, ControllerCollection}
+import com.twitter.finatra._
+import com.twitter.ostrich.admin.ServiceTracker
+import com.twitter.ostrich.admin
import com.twitter.logging.Logger
import com.twitter.io.{Files, TempFile}
import com.twitter.zipkin.config.ZipkinWebConfig
import com.twitter.zipkin.gen
+import com.twitter.util.Future
import java.net.InetSocketAddress
+import org.jboss.netty.handler.codec.http.HttpResponse
+import scala.Left
+import scala.Right
+import scala.Some
-class ZipkinWeb(config: ZipkinWebConfig) extends Service {
+class ZipkinWeb(config: ZipkinWebConfig) extends admin.Service {
val log = Logger.get()
var server: Option[Server] = None
+ val controllers = new ControllerCollection[Request, Future[Response], Future[HttpResponse]]
+
def start() {
val clientBuilder = ClientBuilder()
.codec(ThriftClientFramedCodec())
@@ -43,10 +52,10 @@ class ZipkinWeb(config: ZipkinWebConfig) extends Service {
val resource = config.resource
val app = config.appConfig(client)
- FinatraServer.register(resource)
- FinatraServer.register(app)
+ register(resource)
+ register(app)
- val finatraService = new AppService
+ val finatraService = new AppService(controllers)
val service = finatraService
server = Some {
@@ -63,6 +72,10 @@ class ZipkinWeb(config: ZipkinWebConfig) extends Service {
def shutdown() {
server.foreach { _.close() }
}
+
+ def register(app: AbstractFinatraController[Request, Future[Response], Future[HttpResponse]]) {
+ controllers.add(app)
+ }
}
class Resource(resourceDirs: Map[String, String]) extends Controller {
View
3  zipkin-finatra/src/scripts/finatra.sh
@@ -0,0 +1,3 @@
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+VERSION=@VERSION@
+java -cp "$DIR/../libs/*" -jar $DIR/../zipkin-finatra_2.9.1-$VERSION.jar $*
View
2  zipkin-gems/zipkin-query/lib/zipkin-query/version.rb
@@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
module ZipkinQuery
- VERSION = "0.0.3"
+ VERSION = "0.1.0"
end
View
61 zipkin-gems/zipkin-query/vendor/gen-rb/zipkin-query/zipkin_query.rb
@@ -60,6 +60,22 @@ def recv_getTraceIdsByAnnotation()
raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'getTraceIdsByAnnotation failed: unknown result')
end
+ def tracesExist(trace_ids)
+ send_tracesExist(trace_ids)
+ return recv_tracesExist()
+ end
+
+ def send_tracesExist(trace_ids)
+ send_message('tracesExist', TracesExist_args, :trace_ids => trace_ids)
+ end
+
+ def recv_tracesExist()
+ result = receive_message(TracesExist_result)
+ return result.success unless result.success.nil?
+ raise result.qe unless result.qe.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'tracesExist failed: unknown result')
+ end
+
def getTracesByIds(trace_ids, adjust)
send_getTracesByIds(trace_ids, adjust)
return recv_getTracesByIds()
@@ -273,6 +289,17 @@ def process_getTraceIdsByAnnotation(seqid, iprot, oprot)
write_result(result, oprot, 'getTraceIdsByAnnotation', seqid)
end
+ def process_tracesExist(seqid, iprot, oprot)
+ args = read_args(iprot, TracesExist_args)
+ result = TracesExist_result.new()
+ begin
+ result.success = @handler.tracesExist(args.trace_ids)
+ rescue Zipkin::QueryException => qe
+ result.qe = qe
+ end
+ write_result(result, oprot, 'tracesExist', seqid)
+ end
+
def process_getTracesByIds(seqid, iprot, oprot)
args = read_args(iprot, GetTracesByIds_args)
result = GetTracesByIds_result.new()
@@ -533,6 +560,40 @@ def validate
::Thrift::Struct.generate_accessors self
end
+ class TracesExist_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ TRACE_IDS = 1
+
+ FIELDS = {
+ TRACE_IDS => {:type => ::Thrift::Types::LIST, :name => 'trace_ids', :element => {:type => ::Thrift::Types::I64}}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class TracesExist_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+ QE = 1
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::SET, :name => 'success', :element => {:type => ::Thrift::Types::I64}},
+ QE => {:type => ::Thrift::Types::STRUCT, :name => 'qe', :class => Zipkin::QueryException}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
class GetTracesByIds_args
include ::Thrift::Struct, ::Thrift::Struct_Union
TRACE_IDS = 1
View
2  zipkin-gems/zipkin-tracer/lib/zipkin-tracer.rb
@@ -53,7 +53,7 @@ def initialize(app)
end
def call(env)
- id = ::Trace::TraceId.new(::Trace.generate_id, nil, ::Trace.generate_id, true)
+ id = ::Trace::TraceId.new(::Trace.generate_id, nil, ::Trace.generate_id, true, ::Trace::Flags::EMPTY)
::Trace.default_endpoint = ::Trace.default_endpoint.with_service_name(@service_name).with_port(@service_port)
::Trace.sample_rate=(@sample_rate)
tracing_filter(id, env) { @app.call(env) }
View
2  zipkin-gems/zipkin-tracer/lib/zipkin-tracer/version.rb
@@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
module ZipkinTracer
- VERSION = "0.0.1"
+ VERSION = "0.1.0"
end
View
2  zipkin-gems/zipkin-tracer/zipkin-tracer.gemspec
@@ -31,6 +31,6 @@ Gem::Specification.new do |s|
s.files = Dir.glob("{bin,lib}/**/*")
s.require_path = 'lib'
- s.add_dependency "finagle-thrift", "~> 1.2.0"
+ s.add_dependency "finagle-thrift", "~> 1.3.0"
s.add_dependency "scribe", "~> 0.2.4"
end
View
53 zipkin-hadoop-job-runner/src/main/resources/email.mustache
@@ -0,0 +1,53 @@
+{{#html}}
+{{#header}}
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
+"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html>
+ <head>
+ <h2>Service Report for {{serviceName}}</h2>
+ </head>
+ {{/header}}
+
+ {{#body}}
+ <body>
+ {{#oneLineResults}}
+ <p>{{result}}</p> <br />
+ {{/oneLineResults}}
+
+ {{#tableResults}}
+ <h4></p>{{tableResultHeader}}</p></h4>
+ <table border = 1 cellpadding=3 cellspacing=1 rules=groups frame=box>
+ <thead>
+ {{#tableHeader}}
+ <tr>
+ {{#tableHeaderTokens}}
+ <td><strong>{{tableHeaderToken}}</strong></td>
+ {{/tableHeaderTokens}}
+ </tr>
+ {{/tableHeader}}
+ </thead>
+
+ {{#tableRows}}
+ <tr>
+ {{#tableRowTokens}}
+ <td>{{tableRowToken}}</td>
+ {{/tableRowTokens}}
+ </tr>
+ {{/tableRows}}
+
+ {{#tableUrlRows}}
+ <tr>
+ <td><a href="{{urlToken1}}">{{urlToken2}}</td>
+ {{#tableUrlRowTokens}}
+ <td>{{tableUrlRowToken}}</td>
+ {{/tableUrlRowTokens}}
+ </tr>
+ {{/tableUrlRows}}
+ </table>
+ {{/tableResults}}
+ </body>
+ {{/body}}
+</html>
+{{/html}}
+
+
View
104 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/HadoopJobClient.scala
@@ -0,0 +1,104 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import collection.mutable
+import collection.immutable.HashMap
+import java.util.Scanner
+import java.io.File
+import com.twitter.zipkin.hadoop.sources._
+
+/**
+ * Basic client for postprocessing hadoop jobs
+ * @param combineSimilarNames whether or not we should differentiate between similar names
+ */
+
+abstract class HadoopJobClient(val combineSimilarNames: Boolean) {
+
+ /**
+ * Process a key and its value
+ * @param s the key passed
+ * @param lines values associated with the key
+ */
+ def processKey(s: String, lines: List[LineResult])
+
+ /**
+ * Starts the postprocessing for the client
+ * @param filename the input filename
+ * @param output the output filename
+ */
+ def start(filename : String, output : String)
+
+ def getServiceName(service: String) = {
+ if (combineSimilarNames) HadoopJobClient.serviceNames(service) else service
+ }
+
+ def getLineResult(line: List[String]): LineResult = {
+ new PerServiceLineResult(line)
+ }
+
+ /**
+ * Processes a single directory, with data files expected in TSV format, with key being the first value on each row
+ * @param file a file representing a directory where the data is stored
+ */
+ def processDir(file: File) {
+ var serviceToValues = new mutable.HashMap[String, List[LineResult]]()
+ Util.traverseFileTree(file)({ f: File =>
+ val s = new Scanner(f)
+ while (s.hasNextLine()) {
+ val line = getLineResult(s.nextLine.split("\t").toList.map({_.trim()}))
+ val serviceName = line.getKey()
+ if (serviceToValues.contains(serviceName)) {
+ serviceToValues(serviceName) ++= List(line)
+ } else {
+ serviceToValues += serviceName -> List(line)
+ }
+ }
+ })
+ for (t <- serviceToValues) {
+ val (service, values) = t
+ processKey(service, values)
+ }
+ }
+}
+
+object HadoopJobClient {
+
+ val DELIMITER = ":"
+ var serviceNames = new HashMap[String, String]()
+
+ /**
+ * Given a directory of files formatted in TSV format with each line being of the form
+ * servicename standardizedservicename
+ * reads that information into a map
+ * @param dirname the name of a directory containing all the service name information
+ */
+ def populateServiceNames(dirname: String) = {
+ Util.traverseFileTree(new File(dirname))({f: File =>
+ val s = new Scanner(f)
+ while (s.hasNextLine()) {
+ val line = new Scanner(s.nextLine())
+ val serviceName = Util.toSafeHtmlName(line.next())
+ val standardized = if (line.hasNext) line.next else serviceName
+ if (!serviceNames.contains(serviceName)) {
+ serviceNames += serviceName -> standardized
+ }
+ }
+ })
+ }
+
+}
View
106 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/LineResult.scala
@@ -0,0 +1,106 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+/**
+ * This class represents a single line generated by a Hadoop job
+ * @param line the single line of data
+ */
+abstract class LineResult(var line: List[String]) {
+
+ checkIsValid()
+
+ /**
+ * Throws an exception of the line of data is malformed
+ */
+ def checkIsValid()
+
+ /**
+ * Gets the key of the line of data
+ * @return the key
+ */
+ def getKey(): String
+
+ /**
+ * Gets the value for the line of data
+ * @return the value associated to this line of data, as a list of strings tokenized by tabs
+ */
+ def getValue(): List[String]
+
+ /**
+ * Returns the value of the line as a String
+ * @param sep the separator for the line
+ * @return a String representation of the line
+ */
+ def getValueAsString(sep: String): String = {
+ getValue().mkString(sep)
+ }
+
+ /**
+ * Returns the value of the line as a String with elements separated by tabs
+ * @return the value of the line
+ */
+ def getValueAsString(): String = {
+ getValueAsString("\t")
+ }
+
+ override def toString(): String = {
+ "(" + getKey() + ", [" + getValueAsString(", ") + "])"
+ }
+
+}
+
+/**
+ * A line result for Hadoop jobs whose keys are a single service
+ * @param line the single line of data
+ */
+class PerServiceLineResult(line: List[String]) extends LineResult(line) {
+
+ def checkIsValid() {
+ if (line == null || line.isEmpty) {
+ throw new IllegalArgumentException("Invalid input list: " + (if (line == null) "null" else line.mkString(", ")))
+ }
+ }
+
+ def getKey() = {
+ line.head
+ }
+
+ def getValue() = {
+ line.tail
+ }
+}
+
+/**
+ * A line result for Hadoop jobs whose keys are pairs of services
+ * @param line the single line of data
+ */
+class PerServicePairLineResult(line: List[String]) extends LineResult(line) {
+ def checkIsValid() {
+ if (line == null || line.length < 2) {
+ throw new IllegalArgumentException("Invalid input list: " + (if (line == null) "null" else line.mkString(", ")))
+ }
+ }
+
+ def getKey() = {
+ line.head + HadoopJobClient.DELIMITER + line.tail.head
+ }
+
+ def getValue() = {
+ line.tail.tail
+ }
+}
View
54 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServiceClient.scala
@@ -0,0 +1,54 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import scala.collection.JavaConverters._
+
+/**
+ * Client which writes to the server, and which writes per service
+ * @param portNumber the port number to write to
+ */
+
+abstract class PerServiceClient(combineSimilarNames: Boolean, portNumber: Int) extends
+ WriteToServerClient(combineSimilarNames, portNumber) {
+}
+
+/**
+ * Connects to the Zipkin Collector, then processes data from PopularAnnotations and sends it there. This powers the
+ * typeahead functionality for annotations
+ */
+
+class PopularAnnotationsClient(portNumber: Int) extends PerServiceClient(false, portNumber) {
+
+ def processKey(service: String, lines: List[LineResult]) {
+ client.storeTopAnnotations(service, lines.map(line => line.getValueAsString()).asJava)
+ }
+
+}
+
+/**
+ * Connects to the Zipkin Collector, then processes data from PopularKeys and sends it there. This powers the
+ * typeahead functionality for annotations
+ */
+
+class PopularKeyValuesClient(portNumber: Int) extends PerServiceClient(false, portNumber) {
+
+ def processKey(service: String, values: List[LineResult]) {
+ client.storeTopKeyValueAnnotations(service, values.map(line => line.getValueAsString()).asJava)
+ }
+
+}
View
34 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServicePairClient.scala
@@ -0,0 +1,34 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import java.util.Scanner
+import com.twitter.zipkin.gen
+import sources.Util
+
+/**
+ * Client which writes to a server, and per service pair
+ * @param combineSimilarNames
+ * @param portNumber
+ */
+abstract class PerServicePairClient(combineSimilarNames: Boolean, portNumber: Int) extends
+ WriteToServerClient(combineSimilarNames, portNumber) {
+
+ override def getLineResult(line: List[String]) = {
+ new PerServicePairLineResult(line)
+ }
+}
View
128 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/Postprocess.scala
@@ -0,0 +1,128 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import com.twitter.zipkin.hadoop.sources.Util
+import email.EmailContent
+
+/**
+ * Runs all the jobs which write to file on the input. The arguments are expected to be inputdirname outputdirname servicenamefile
+ */
+object PostprocessWriteToFile {
+
+ val jobList = List(("WorstRuntimesPerTrace", new WorstRuntimesPerTraceClient(Util.ZIPKIN_TRACE_URL)),
+ ("Timeouts", new TimeoutsClient()),
+ ("Retries", new RetriesClient()),
+ ("MemcacheRequest", new MemcacheRequestClient()),
+ ("ExpensiveEndpoints", new ExpensiveEndpointsClient()))
+
+
+ def main(args: Array[String]) {
+ val input = args(0)
+ val output = args(1)
+ val serviceNames = args(2)
+
+ HadoopJobClient.populateServiceNames(serviceNames)
+ for (jobTuple <- jobList) {
+ val (jobName, jobClient) = jobTuple
+ jobClient.start(input + "/" + jobName, output)
+ }
+ EmailContent.writeAll()
+ }
+}
+
+//TODO: Replace (or supplement) this with one main method that runs all jobs
+
+/**
+ * Runs the PopularKeysClient on the input
+ */
+object ProcessPopularKeys {
+ def main(args : Array[String]) {
+ val portNumber = augmentString(args(2)).toInt
+ HadoopJobClient.populateServiceNames(args(0))
+ val c = new PopularKeyValuesClient(portNumber)
+ c.start(args(0), args(1))
+ }
+}
+
+/**
+ * Runs the PopularAnnotationsClient on the input
+ */
+
+object ProcessPopularAnnotations {
+ def main(args : Array[String]) {
+ val portNumber = augmentString(args(2)).toInt
+ HadoopJobClient.populateServiceNames(args(0))
+ val c = new PopularAnnotationsClient(portNumber)
+ c.start(args(0), args(1))
+ }
+}
+
+
+/**
+ * Runs the MemcacheRequestClient on the input
+ */
+
+object ProcessMemcacheRequest {
+ def main(args : Array[String]) {
+ HadoopJobClient.populateServiceNames(args(0))
+ val c = new MemcacheRequestClient()
+ c.start(args(0), args(1))
+ EmailContent.writeAll()
+ }
+}
+
+
+/**
+ * Runs the TimeoutsClient on the input
+ */
+
+object ProcessTimeouts {
+ def main(args : Array[String]) {
+ HadoopJobClient.populateServiceNames(args(0))
+ val c = new TimeoutsClient()
+ c.start(args(0), args(1))
+ EmailContent.writeAll()
+ }
+}
+
+
+/**
+ * Runs the ExpensiveEndpointsClient on the input
+ */
+
+object ProcessExpensiveEndpoints {
+
+ def main(args: Array[String]) {
+ HadoopJobClient.populateServiceNames(args(0))
+ val c = new ExpensiveEndpointsClient()
+ c.start(args(0), args(1))
+ EmailContent.writeAll()
+ }
+
+}
+
+object ProcessWorstRuntimesPerTrace {
+
+ def main(args: Array[String]) {
+ HadoopJobClient.populateServiceNames(args(0))
+ val c = new WorstRuntimesPerTraceClient(Util.ZIPKIN_TRACE_URL)
+ c.start(args(0), args(1))
+ EmailContent.writeAll()
+ }
+
+}
View
105 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala
@@ -1,105 +0,0 @@
-/*
-* Copyright 2012 Twitter Inc.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package com.twitter.zipkin.hadoop
-
-import org.apache.thrift.TException
-import org.apache.thrift.protocol.TBinaryProtocol
-import scala.collection.JavaConverters._
-import org.apache.thrift.transport.{TFramedTransport, TSocket, TTransport, TTransportException}
-import java.io.{FileNotFoundException, File}
-import com.twitter.zipkin.gen
-import java.net.SocketException
-import java.util.{Arrays, Scanner}
-
-/**
- * Runs the PopularKeysClient on the input
- */
-object ProcessPopularKeys {
- def main(args : Array[String]) {
- val c = new PopularKeysClient()
- val portNumber = augmentString(args(2)).toInt
- val isKeyData = augmentString(args(3)).toBoolean
- c.start(args(0), args(1), portNumber, isKeyData)
- }
-}
-
-/**
- * Connects to the Zipkin Collector, then processes data from PopularKeys and sends it there. This powers the
- * typeahead functionality for annotations
- */
-class PopularKeysClient {
- /**
- * Given a file name, the server name and port number, connects to the server, then writes to it
- * the top 100 key values per service name given the data in filename
- * @param filename
- * @param serverName
- * @param portNumber
- */
- def start(filename : String, serverName : String, portNumber : Int, isKeyData : Boolean) {
- var transport : TTransport = null
- try {
- // establish connection to the server
- transport = new TFramedTransport(new TSocket(serverName, portNumber))
- val protocol = new TBinaryProtocol(transport)
- val client = new gen.ZipkinCollector.Client(protocol)
- transport.open()
- // Read file
- val s = new Scanner(new File(filename))
- var line : Scanner = new Scanner(s.nextLine())
- if (!s.hasNextLine()) return
- var oldService : String = line.next()
- var keys : List[String] = List(line.next())
- while (s.hasNextLine()) {
- line = new Scanner(s.nextLine())
- val currentString = line.next()
- var value = ""
- if (line.hasNext()) value = line.next()
- while (line.hasNext()) {
- value += " " + line.next()
- }
- // Keep adding the keys to the current service's list until we are done with that service
- if (oldService != currentString) {
- // when we are, write that list to the server
- if (isKeyData)
- client.storeTopKeyValueAnnotations(oldService, keys.asJava)
- else
- client.storeTopAnnotations(oldService, keys.asJava)
- println("Writing " + keys.toString + " to service " + oldService)
- // and start processing the new one
- keys = List(value)
- oldService = currentString
- } else {
- keys = keys ::: List(value)
- }
- }
- // Write the last service in the file and its keys as well
- if (isKeyData)
- client.storeTopKeyValueAnnotations(oldService, keys.asJava)
- else
- client.storeTopAnnotations(oldService, keys.asJava)
- } catch {
- case se: SocketException => se.printStackTrace()
- case tte : TTransportException => tte.printStackTrace()
- case te : TException => te.printStackTrace()
- case e : Exception => e.printStackTrace()
- } finally {
- if (transport != null)
- transport.close()
- }
- }
-}
-
View
169 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToFileClient.scala
@@ -0,0 +1,169 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import java.io._
+import collection.mutable.HashMap
+import com.twitter.zipkin.hadoop.sources.Util
+import email.EmailContent
+
+/**
+ * A client which writes to a file. This is intended for use mainly to format emails
+ * @param combineSimilarNames
+ * @param jobname
+ */
+
+abstract class WriteToFileClient(combineSimilarNames: Boolean, jobname: String) extends HadoopJobClient(combineSimilarNames) {
+
+ protected var outputDir = ""
+
+ def toHtmlName(s: String) = outputDir + "/" + Util.toSafeHtmlName(s) + ".html"
+
+ def start(input: String, outputDir: String) {
+ this.outputDir = outputDir
+ processDir(new File(input))
+ }
+}
+
+/**
+ * A client which writes MemcacheRequest data to the file specified
+ */
+
+class MemcacheRequestClient extends WriteToFileClient(true, "MemcacheRequest") {
+
+ def processKey(service: String, lines: List[LineResult]) {
+ val numberMemcacheRequests = {
+ val valuesToInt = lines.map({ line: LineResult => augmentString(line.getValueAsString()).toInt })
+ valuesToInt.foldLeft(0) ((left: Int, right: Int) => left + right )
+ }
+ val mt = EmailContent.getTemplate(service, toHtmlName(service))
+ mt.addOneLineResult("Service " + service + " made " + numberMemcacheRequests + " redundant memcache requests")
+ }
+
+}
+
+/**
+ * A client which writes to a file, per each service pair
+ */
+
+abstract class WriteToTableClient(jobname: String) extends WriteToFileClient(false, jobname) {
+
+ def getTableResultHeader(service: String): String
+
+ def getTableHeader(): List[String]
+
+ def addTable(service: String, lines: List[LineResult], mt: EmailContent) = {
+ mt.addTableResult(getTableResultHeader(service), getTableHeader(), lines)
+ }
+
+ def processKey(service: String, lines: List[LineResult]) {
+ val mt = EmailContent.getTemplate(service, toHtmlName(service))
+ addTable(service, lines, mt)
+ }
+}
+
+
+/**
+ * A client which writes Timeouts data to the file specified
+ */
+
+class TimeoutsClient extends WriteToTableClient("Timeouts") {
+
+ def getTableResultHeader(service: String) = {
+ service + " timed out in calls to the following services"
+ }
+
+ def getTableHeader() = {
+ List("Service Called", "# of Timeouts")
+ }
+}
+
+
+/**
+ * A client which writes Retries data to the file specified
+ */
+
+class RetriesClient extends WriteToTableClient("Retries") {
+
+ def getTableResultHeader(service: String) = {
+ service + " retried in calls to the following services:"
+ }
+
+ def getTableHeader() = {
+ List("Service Called", "# of Retries")
+ }
+}
+
+
+/**
+ * A client which writes WorstRuntimes data to the file specified
+ */
+
+class WorstRuntimesClient extends WriteToTableClient("WorstRuntimes") {
+
+ def getTableResultHeader(service: String) = {
+ "Service " + service + " took the longest for these spans:"
+ }
+
+ def getTableHeader() = {
+ List("Span ID", "Duration")
+ }
+}
+
+
+/**
+ * A client which writes WorstRuntimesPerTrace data to the file specified. Formats it as a HTML url
+ */
+
+class WorstRuntimesPerTraceClient(zipkinUrl: String) extends WriteToTableClient("WorstRuntimesPerTrace") {
+
+ def getTableResultHeader(service: String) = {
+ "Service " + service + " took the longest for these traces:"
+ }
+
+ def getTableHeader() = {
+ List("Trace ID", "Duration")
+ }
+
+ override def addTable(service: String, lines: List[LineResult], mt: EmailContent) = {
+ val formattedAsUrl = lines.map {line =>
+ if (line.getValue().length < 2) {
+ throw new IllegalArgumentException("Malformed line: " + line)
+ }
+ val hypertext = line.getValue().head
+ (Util.ZIPKIN_TRACE_URL + hypertext, hypertext, line)
+ }
+ mt.addUrlTableResult(getTableResultHeader(service), getTableHeader(), formattedAsUrl)
+ }
+
+}
+
+
+/**
+ * A client which writes ExpensiveEndpoints data to the file specified
+ */
+
+class ExpensiveEndpointsClient extends WriteToTableClient("ExpensiveEndpoints") {
+
+ def getTableResultHeader(service: String) = {
+ "The most expensive calls for " + service + " were:"
+ }
+
+ def getTableHeader() = {
+ List("Service Called", "Duration")
+ }
+}
View
46 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToServerClient.scala
@@ -0,0 +1,46 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import org.apache.thrift.protocol.TBinaryProtocol
+import com.twitter.zipkin.gen
+import java.io.File
+import org.apache.thrift.transport.{TSocket, TFramedTransport, TTransport}
+
+abstract class WriteToServerClient(combineSimilarNames: Boolean, portNumber: Int) extends HadoopJobClient(combineSimilarNames) {
+
+ protected var client : gen.ZipkinCollector.Client = null
+
+ def start(dirname: String, serverName: String) {
+ var transport : TTransport = null
+ try {
+ // establish connection to the server
+ transport = new TFramedTransport(new TSocket(serverName, portNumber))
+ val protocol = new TBinaryProtocol(transport)
+ client = new gen.ZipkinCollector.Client(protocol)
+ transport.open()
+ // Read file
+ processDir(new File(dirname))
+ } catch {
+ // TODO: Investigate using logging
+ case t: Throwable => t.printStackTrace()
+ } finally {
+ if (transport != null)
+ transport.close()
+ }
+ }
+}
View
165 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/email/EmailContent.scala
@@ -0,0 +1,165 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop.email
+
+import com.github.mustachejava._
+import scala.collection.JavaConverters._
+import collection.immutable.HashMap
+import java.io.{FileOutputStream, PrintWriter}
+import com.twitter.zipkin.hadoop.LineResult
+
+/**
+ * A basic mustache template for formatting zipkin service reports as emails
+ * @param serviceName the name of a service
+ */
+class EmailContent(serviceName: String) {
+
+ private var tableResults = List[TableResults]()
+ private var oneLineResults = List[OneLineResults]()
+ private var body = List[Body]()
+ private var header = List(new Header(serviceName))
+ private var document = List[Html]()
+
+ def html() = {
+ document.asJava
+ }
+
+ case class Html(var header: java.util.List[Header], var body: java.util.List[Body] ) {}
+
+ case class Header(var serviceName: String) {}
+
+ case class Body(var oneLineResults: java.util.List[OneLineResults], var tableResults: java.util.List[TableResults]){}
+
+ case class OneLineResults(var result: String) {}
+
+ case class TableResults(var tableResultHeader: String, var tableHeader: TableHeader,
+ var tableRows: java.util.List[TableRow], var tableUrlRows: java.util.List[TableUrlRow]) {}
+
+ case class TableHeader(var tableHeaderTokens: java.util.List[TableHeaderToken]) {}
+
+ case class TableRow(var tableRowTokens: java.util.List[TableRowToken]) {}
+
+ case class TableHeaderToken(var tableHeaderToken: String) {}
+
+ case class TableRowToken(var tableRowToken: String) {}
+
+ case class TableUrlRow(var urlToken1: String, var urlToken2: String, var tableUrlRowTokens: java.util.List[TableUrlRowToken]) {}
+
+ case class TableUrlRowToken(var tableUrlRowToken: String) {}
+
+ /**
+ * Adds a single line format result
+ * @param result a single line result
+ */
+
+ def addOneLineResult(result: String) {
+ oneLineResults ::= new OneLineResults(result)
+ }
+
+ /**
+ * Adds a result formatted as a table
+ * @param tableResultHeader the header displayed above the table
+ * @param tableHeader the header of the table
+ * @param tableRows the rows of the table
+ */
+ def addTableResult(tableResultHeader: String, tableHeader: List[String], tableRows: List[LineResult]) {
+ val header = new TableHeader(tableHeader.map(s => new TableHeaderToken(s)).asJava)
+ val rowList = tableRows.map ( line => {
+ val values = line.getValue().map(token => new TableRowToken(token))
+ new TableRow(values.asJava)
+ }).asJava
+ tableResults ::= new TableResults(tableResultHeader, header, rowList, null)
+ }
+
+ /**
+ * Adds a URL result formatted as a table
+ * @param tableResultHeader the header displayed above the table
+ * @param tableHeader the header of the table
+ * @param tableUrlRows the rows of the table, where the first element is a URL
+ */
+ def addUrlTableResult(tableResultHeader: String, tableHeader: List[String], tableUrlRows: List[(String, String, LineResult)]) {
+ val header = new TableHeader(tableHeader.map(s => new TableHeaderToken(s)).asJava)
+ val rowUrlList = tableUrlRows.map ({ row =>
+ val (url, hypertext, line) = row
+ if (line.getValue().length < 2) {
+ throw new IllegalArgumentException("Malformed line: " + line)
+ }
+ new TableUrlRow(url, hypertext, line.getValue().tail.map(token => new TableUrlRowToken(token)).asJava)
+ }).asJava
+ tableResults ::= new TableResults(tableResultHeader, header, null, rowUrlList)
+ }
+
+ /**
+ * Apply all changes made to the body of the HTML document
+ */
+ def apply() {
+ body = List(new Body(oneLineResults.asJava, tableResults.asJava))
+ document = List(new Html(header.asJava, body.asJava))
+ }
+
+ /**
+ * Write the formatted file to the specified PrintWriter
+ * @param pw a PrintWriter
+ */
+ def write(pw: PrintWriter) {
+ apply()
+ val mf = new DefaultMustacheFactory()
+ val mustache = mf.compile("email.mustache")
+ mustache.execute(pw, this).flush()
+ }
+
+}
+
+object EmailContent {
+
+ // Ensure that we never make a different EmailContent for the same service
+ private var templates = Map[String, EmailContent]()
+ private var serviceToHtml = Map[String, String]()
+
+ /**
+ * Gets a EmailContent for a service. If another such template already exists, we use that one.
+ * The user also specifes the name of the html file he/she wants to write to. If the template already exists,
+ * we don't modify the html file.
+ *
+ * @param service Service name
+ * @param html the html file you want to write to
+ * @return
+ */
+ def getTemplate(service: String, html: String) = {
+ if (templates.contains(service)) {
+ templates(service)
+ } else {
+ serviceToHtml += service -> html
+ val s = new EmailContent(service)
+ templates += service -> s
+ s
+ }
+ }
+
+ /**
+ * Get all the services for which we've templated
+ * @return an Iterator over all the services we have templates for
+ */
+ def services() = templates.keys
+
+ def writeAll() = {
+ for (service <- services()) {
+ val pw = new PrintWriter(new FileOutputStream(serviceToHtml(service), true))
+ templates(service).write(pw)
+ }
+ }
+}
View
64 zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala
@@ -0,0 +1,64 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop.sources
+
+import java.io.File
+
+/**
+ * A collection of useful methods
+ */
+
+object Util {
+
+ val ZIPKIN_TRACE_URL = "your.zipkin.url"
+
+ /**
+ * Returns whether or not a directory will contain data. A directory contains data if its first character is not '_'
+ * @param f a File
+ * @return if the directory the file represents contains data
+ */
+ def isDataDir(f: File) = {
+ f.getName.charAt(0) != '_'
+ }
+
+ /**
+ * Traverses a directory and applies a function to each file in the directory
+ * @param func a function which takes a File and returns a Unit
+ * @param f a File representing a directory and which we want to apply func on
+ */
+ def traverseFileTree(f: File)(func: File => Unit): Unit = {
+ if (isDataDir(f)) {
+ if (f.isDirectory) {
+ val children = f.listFiles()
+ for (child <- children) {
+ traverseFileTree(child)(func)
+ }
+ } else {
+ func(f)
+ }
+ }
+ }
+
+ /**
+ * Converts the string to service name in HTML format (adds the .html tail)
+ * @param s a service name
+ * @return the string as a service name in HTML
+ */
+ def toSafeHtmlName(s: String) = {
+ s.trim().replace("/", "-")
+ }
+}
View
31 zipkin-hadoop-job-runner/src/scripts/find_data.sh
@@ -1,31 +0,0 @@
-#!/bin/bash
-# Copyright 2012 Twitter Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-#Usage : find_data.sh hostname input_file_name server_name port_number is_key_data
-
-HOSTNAME=$1
-HDFSFILENAME=$2
-SERVERNAME=$3
-PORTNUMBER=$4
-ISKEYDATA=$5
-
-#Get the file location for the jar file with all the dependencies and moves it to where the job is being run
-DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-DIR="$(dirname "$DIR")"
-DIR="$(dirname "$DIR")"
-scp $DIR/target/zipkin-hadoop-job-runner-assembly-0.2.0-SNAPSHOT.jar $HOSTNAME:.
-
-#Reads the input into the server
-ssh -C $HOSTNAME "java -cp zipkin-hadoop-job-runner-assembly-0.2.0-SNAPSHOT.jar com.twitter.zipkin.hadoop.ProcessPopularKeys "$HDFSFILENAME" "$SERVERNAME" "$PORTNUMBER" "$ISKEYDATA
View
118 zipkin-hadoop-job-runner/src/scripts/has-sampled.rb
@@ -0,0 +1,118 @@
+#!/usr/bin/env ruby
+# Copyright 2012 Twitter Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'rubygems'
+require 'zipkin-query'
+require 'thrift_client'
+require 'finagle-thrift'
+require 'zookeeper'
+require 'optparse'
+require 'ostruct'
+require 'set'
+
+class OptparseHasSampledArguments
+
+ #
+ # Return a structure describing the options.
+ #
+ def self.parse(args)
+ # The options specified on the command line will be collected in *options*.
+ # We set default values here.
+ options = OpenStruct.new
+ options.input = nil
+ options.output = nil
+ options.preprocessor = false
+
+ opts = OptionParser.new do |opts|
+ opts.banner = "Usage: run_job.rb -i INPUT -o OUTPUT"
+
+ opts.separator ""
+ opts.separator "Specific options:"
+
+ opts.on("-i", "--input INPUT",
+ "The INPUT file to read from") do |input|
+ options.input = input
+ end
+
+ opts.on("-o", "--output OUTPUT",
+ "The OUTPUT file to write to") do |output|
+ options.output = output
+ end
+
+ opts.separator ""
+ opts.separator "Common options:"
+ opts.on_tail("-h", "--help", "Show this message") do
+ puts opts
+ exit
+ end
+ end
+ opts.parse!(args)
+ options
+ end
+end
+
+options = OptparseHasSampledArguments.parse(ARGV)
+
+$config = {
+ :zipkin_query_host => "localhost", #whatever the collector is
+ :zipkin_query_port => 9411,
+ :skip_zookeeper => true
+}
+
+def sampled_traces(trace_ids)
+ result = false
+ traces = nil
+ ZipkinQuery::Client.with_transport($config) do |client|
+ traces = client.tracesExist(trace_ids)
+ end
+ return traces
+end
+
+def get_trace_id(line)
+ return line.split("\t")[1].to_i
+end
+
+File.open(options.output, 'w') do |out_file|
+ trace_list = []
+ File.open(options.input, 'r').each do |line|
+ trace_list = trace_list << get_trace_id(line)
+ end
+ sampled = sampled_traces(trace_list)
+ File.open(options.input, 'r').each do |line|
+ if (sampled.include?(get_trace_id(line)))
+ out_file.print line
+ puts line
+ end
+ end
+end
+
+=begin
+h = Hash.new
+
+File.open(options.input, 'r').each do |line|
+ ary = line.split("\t")
+ if h[ary[0]] == nil
+ h[ary[0]] = Array.new(1, ary[1].to_i)
+ else
+ h[ary[0]] = h[ary[0]] << ary[1].to_i
+ end
+end
+
+ary = Array.new()
+
+h.each do |service, traces|
+ p sampled_traces(traces)
+end
+=end
View
1  zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala
@@ -38,6 +38,7 @@ class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
+ .map('name_1 -> 'name_1){ s: String => if (s == null) Util.UNKNOWN_SERVICE_NAME else s }
.groupBy('service, 'name_1){ _.size('count) }
.groupBy('service){ _.sortBy('count) }
.write(Tsv(args("output")))
View
2  zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala
@@ -57,6 +57,8 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id_1, idName)
+ .map('name_1 -> 'name_1){ s: String => if (s == null) Util.UNKNOWN_SERVICE_NAME else s }
.groupBy('name_1, 'service){ _.average('duration) }
+ .groupBy('name_1, 'service){ _.sortBy('duration).reverse}
.write(Tsv(args("output")))
}
View
57 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.twitter.zipkin.hadoop
+
+import com.twitter.zipkin.gen.{BinaryAnnotation, Constants, SpanServiceName, Annotation}
+import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
+import com.twitter.zipkin.hadoop.sources.{Util, PreprocessedSpanSource}
+import java.nio.ByteBuffer
+
+
+/**
+ * Finds traces with duplicate trace IDs
+ */
+
+class FindDuplicateTraces(args: Args) extends Job(args) with DefaultDateRangeJob {
+
+ val maxDuration = augmentString(args.required("maximum_duration")).toInt
+
+ val result = PreprocessedSpanSource()
+ .read
+ .mapTo(0 ->('trace_id, 'annotations)) { s: SpanServiceName =>
+ (s.trace_id, s.annotations.toList)
+ }.flatMap('annotations -> 'first_and_last_timestamps ) {al : List[Annotation] =>
+ var first : Long = if (al.length > 0) al(0).timestamp else Int.MaxValue
+ var last : Long = if (al.length > 0) al(0).timestamp else -1
+ al.foreach { a : Annotation =>
+ val timestamp = a.timestamp
+ if (timestamp < first) first = timestamp
+ else if (timestamp > last) last = timestamp
+ }
+ if (first < Int.MaxValue && last > -1) Some(List(first, last)) else None
+ }.groupBy('trace_id){ _.reduce('first_and_last_timestamps -> 'first_and_last_timestamps) { (left : List[Long], right : List[Long]) =>
+ val first = if (left(0) > right(0)) right(0) else left(0)
+ val last = if (left(1) > right(1)) left(1) else right(1)
+ List(first, last)
+ }
+ }
+ .filter('first_and_last_timestamps) { timestamps : List[Long] =>
+ val durationInSeconds = (timestamps(1) - timestamps(0)) / 1000000
+ durationInSeconds >= maxDuration
+ }.project('trace_id)
+ .write(Tsv(args("output")))
+}
View
35 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/GrepByAnnotation.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.zipkin.hadoop
+
+import com.twitter.zipkin.gen.{Annotation, Span}
+import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
+import sources.SpanSource
+
+class GrepByAnnotation(args: Args) extends Job(args) with DefaultDateRangeJob {
+
+ val grepByWord = args.required("word")
+
+ val preprocessed =
+ SpanSource()
+ .read
+ .mapTo(0 -> ('traceid, 'annotations)) { s: Span => (s.trace_id, s.annotations.toList) }
+ .filter('annotations) { annotations: List[Annotation] =>
+ !annotations.filter(p => p.value.toLowerCase().contains(grepByWord)).isEmpty
+ }
+ .project('traceid)
+ .write(Tsv(args("output")))
+}
View
36 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala
@@ -19,8 +19,8 @@ package com.twitter.zipkin.hadoop
import com.twitter.scalding._
import java.nio.ByteBuffer
import java.util.Arrays
-import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Constants, Annotation}
-import com.twitter.zipkin.hadoop.sources.{PrepNoNamesSpanSource, Util}
+import sources.{PreprocessedSpanSource, PrepNoNamesSpanSource, Util}
+import com.twitter.zipkin.gen._
/**
* Find out how often each service does memcache accesses
@@ -29,26 +29,24 @@ class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob {
val preprocessed = PrepNoNamesSpanSource()