Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed runtime issue. #1

Merged
merged 1 commit into from May 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 23 additions & 7 deletions build.sbt
@@ -1,8 +1,9 @@
name := "cassandra-connector-failure-demo"

version := "1.0"
name := "cassandra-connector-success-demo"

scalaVersion := "2.10.4"
version := "1.1"

scalaVersion := "2.10.5"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this helped, it's the version of scala we build 1.2.1 against.


resolvers ++= Seq(
"Typesafe Releases" at "http://repo.typesafe.com/ typesafe/releases",
Expand All @@ -11,14 +12,29 @@ resolvers ++= Seq(

resolvers += "spray" at "http://repo.spray.io/"

val sparkVersion = "1.3.1"
// won't work unless building right from master currently. release coming that supports 1.3.1
//val sparkVersion = "1.3.1"

val sparkVersion = "1.2.1"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"

libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.1"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % sparkVersion % "provided"
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % sparkVersion % "provided"
parallelExecution in assembly := false

libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.2.0"
assemblyOption in assembly ~= { _.copy(includeScala = false) }

mergeStrategy in assembly <<= (mergeStrategy in assembly) {
(old) => {
case PathList("com", "google", xs @ _*) => MergeStrategy.last
case x => old(x)
}
}

//assemblyMergeStrategy in assembly := {
// case PathList("META-INF", xs@_*) => MergeStrategy.discard
Expand Down
3 changes: 3 additions & 0 deletions cql/schema.cql
@@ -1,3 +1,6 @@
// start clean:
DROP KEYSPACE IF EXISTS fake_data;

CREATE KEYSPACE fake_data
WITH replication = {
'class' : 'SimpleStrategy',
Expand Down
2 changes: 1 addition & 1 deletion launch.sh
@@ -1 +1 @@
spark-submit --master local[6] target/scala-2.10/cassandra-connector-failure-demo-assembly-1.0.jar
spark-submit --master local[6] target/scala-2.10/cassandra-connector-success-demo-assembly-1.1.jar
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I went there...

30 changes: 30 additions & 0 deletions src/main/resources/log4j.properties
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added for SBT run to see logging aside from the log hailstorm in spark-submit

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# for production, you should probably set pattern to %c instead of %l.
# (%l is slower.)

# output messages into a rolling log file as well as stdout
log4j.rootLogger=WARN,stdout

# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%F:%L] : %m%n

log4j.logger.fake=DEBUG
log4j.logger.com.datastax.spark.connector=WARN
log4j.logger.org.apache=WARN
50 changes: 0 additions & 50 deletions src/main/scala/FakeDataStreamer.scala

This file was deleted.

17 changes: 0 additions & 17 deletions src/main/scala/FakeMessage.scala

This file was deleted.

@@ -1,4 +1,4 @@
import scala.collection.mutable._
package fake

import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver
Expand All @@ -9,7 +9,7 @@ import scala.util.Random
class FakeDataReceiver( )
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
println(" [*] Waiting for messages. To exit press CTRL+C")


def onStart() {
Expand Down
46 changes: 46 additions & 0 deletions src/main/scala/fake/FakeDataStreamer.scala
@@ -0,0 +1,46 @@
package fake

import com.datastax.spark.connector._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.{SparkConf, SparkContext}

object FakeDataStreamer {

var failureHandler = (a :String, b: String ) => {1}

def main(args: Array[String]) {
if(args.length>0 && args(0) == "d"){
println("-------------Attach debugger now!--------------")
Thread.sleep(8000)
}

val config = new SparkConf()
.setAppName("Fake Data Stream")
.set("spark.cassandra.connection.host", "127.0.0.1")// get from args

val streamContext = new StreamingContext(config, Seconds(3) )

generateReceiverStream(streamContext)

streamContext.start()
streamContext.awaitTermination()
}

def generateReceiverStream(ssc: StreamingContext): Unit = {

val customReceiverStream = ssc.receiverStream[String](new FakeDataReceiver())

val vsStream = customReceiverStream.filter(_.nonEmpty).map(FakeMessage(_))

// your life would be much easier if you didn't camelCase your table field names in cql but instead just did
// message_id, message_content, then you would not need to specify SomeColumns with fieldname as x etc.
// That said, I did create a ticket for us to handle those that do, so you don't have to specify 'as' :-)
val columnsForCamelCaseAlias = SomeColumns("messageId" as "messageId", "messageContent" as "messageContent", "timestamp" as "timestamp")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main issue sadly

vsStream.saveToCassandra("fake_data","messages", columnsForCamelCaseAlias)
vsStream.saveToCassandra("fake_data","latest_message", columnsForCamelCaseAlias)

vsStream.print

}
}
16 changes: 16 additions & 0 deletions src/main/scala/fake/FakeMessage.scala
@@ -0,0 +1,16 @@
package fake

import java.util.Date

/**
* Created by cwheeler on 5/18/15.
*/
case class FakeMessage(messageId: String, messageContent: String, timestamp: Date) extends Serializable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up your 'class' and made it a scala case class with the appropriate 'apply' function


object FakeMessage {

def apply(record: String): FakeMessage = {
val arr = record.split(",")
FakeMessage(arr(0),arr(1), new Date())
}
}