Skip to content

Commit

Permalink
Changes on SparkContext, libraries, and compiler settings.
Browse files Browse the repository at this point in the history
  • Loading branch information
mvrpl committed Mar 26, 2017
1 parent 9875c23 commit 0f46be9
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.6.0" % "provided"
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.5.0"
libraryDependencies += "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test"
3 changes: 2 additions & 1 deletion compiler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ libraryDependencies += "org.apache.spark" %% "spark-core" % "$sparkVer" % "provi
libraryDependencies += "org.apache.spark" %% "spark-sql" % "$sparkVer" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "$sparkVer" % "provided"
libraryDependencies += "com.databricks" % "spark-csv_${scalaVer%.*}" % "1.5.0"
libraryDependencies += "org.scalatest" % "scalatest_${scalaVer%.*}" % "1.9.1" % "test"
EOF

echo "JAR making..."

sbt assembly
sbt 'set test in assembly := {}' clean assembly
if [ $? -eq 0 ];then
echo -e "\nJAR compiled.\nRun example: spark-submit --class main.Shipper target/scala-${scalaVer%.*}/BigShipper-assembly-${bigShipperVer}.jar -c /path/config.json --loglevel error\n"
else
Expand Down
10 changes: 8 additions & 2 deletions src/main/scala/main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ import java.lang.Boolean
import shipper.Loader
import utils.{Utils, Logs}
import scala.collection.JavaConversions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

case class Args(config: String = "", loglevel: String = "info")

object Shipper extends App with Logs {

def execution(fileName:String) : Boolean = {
val conf = new SparkConf().setAppName("Big Shipper")
val sc = new SparkContext(conf)
val loader = new Loader(sc)
try {
val json_project = scala.io.Source.fromFile(fileName).getLines.mkString
val json_configs = Zeison.parse(json_project)
if (json_configs.SOURCE.TYPE.toStr == "delimitedfile"){
new Loader delimitedFiles(json_configs)
loader.delimitedFiles(json_configs)
} else if (json_configs.SOURCE.TYPE.toStr == "json"){
new Loader jsonFiles(json_configs)
loader.jsonFiles(json_configs)
} else if (json_configs.SOURCE.TYPE.toStr == "rdbms"){
info("Source type not implemented")
} else {
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/shipper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import org.tsers.zeison.Zeison
import utils.{Utils, Spark, Logs}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext

class Loader extends Logs {
class Loader(sc: SparkContext) extends Logs {

val utils = new Utils
val spark = new Spark
val spark = new Spark(sc)

def delimitedFiles(configs: Zeison.JValue) : Boolean = {
val fields = configs.SOURCE.FIELDS.map(f => "%s:%s".format(f.NAME.toStr, f.TYPE.toStr.toLowerCase)).mkString(",")
Expand Down
8 changes: 3 additions & 5 deletions src/main/scala/utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import org.apache.log4j.{BasicConfigurator, PatternLayout, ConsoleAppender, Roll
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.tsers.zeison.Zeison
import org.apache.spark.SparkContext

trait Logs {

Expand Down Expand Up @@ -90,10 +89,9 @@ class Utils extends Logs {
}
}

class Spark extends Logs {
class Spark(sparkC: SparkContext) extends Logs {

val conf = new SparkConf().setAppName("Big Shipper")
val sc = new SparkContext(conf)
val sc = sparkC
val sparkVer = "^[0-9]{1,2}\\.[0-9]{1,2}".r.findFirstIn(sc.version).get.toDouble
sc.setLogLevel(System.getProperty("loglevel"))
val hiveContext = new hive.HiveContext(sc)
Expand Down

0 comments on commit 0f46be9

Please sign in to comment.