Skip to content

Commit

Permalink
Add a more complex example and update README
Browse files Browse the repository at this point in the history
  • Loading branch information
leobenkel committed Dec 2, 2019
1 parent 33c9ad1 commit 02a5c7e
Show file tree
Hide file tree
Showing 19 changed files with 533 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object SparkModule {
lazy private val sparkBuilder: SparkSession.Builder = SparkSession.builder
lazy private val sparkBuilderWithName: SparkSession.Builder = sparkBuilder.appName(appName)
protected def appName: String
protected def updateConfig[R](
protected def updateConfig(
sparkBuilder: SparkSession.Builder,
arguments: C
): SparkSession.Builder
Expand Down
24 changes: 24 additions & 0 deletions Library/src/main/scala/com/leobenkel/zparkio/implicits.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.leobenkel.zparkio

import com.leobenkel.zparkio.Services.SparkModule
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import zio.ZIO

// scalastyle:off object.name
object implicits {
type ZDS_R[R, A] = ZIO[R with SparkModule, Throwable, Dataset[A]]
type ZDS[A] = ZDS_R[Any, A]
type ZRDD[R, A] = ZIO[R, Throwable, RDD[A]]

object ZDS {
def apply[A](f: SparkSession => Dataset[A]): ZDS[A] = {
for {
spark <- SparkModule()
} yield {
f(spark)
}
}
}
}
// scalastyle:on
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait Application extends ZparkioApp[Arguments, RuntimeEnv, String] {
RuntimeEnv(cliService, sparkService)
}

override def makeSparkBuilder: SparkModule.Builder[Arguments] = new SparkBuilder {}
override def makeSparkBuilder: SparkModule.Builder[Arguments] = SparkBuilder

override def makeCliBuilder: CommandLineArguments.Builder[Arguments] =
new CommandLineArguments.Builder[Arguments] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package com.leobenkel.zparkioProjectExample
import com.leobenkel.zparkio.Services.SparkModule
import org.apache.spark.sql.SparkSession

trait SparkBuilder extends SparkModule.Builder[Arguments] {
object SparkBuilder extends SparkModule.Builder[Arguments] {
override protected def appName: String = "Zparkio_test"

override protected def updateConfig[R](
override protected def updateConfig(
sparkBuilder: SparkSession.Builder,
arguments: Arguments
): SparkSession.Builder = {
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.leobenkel.zparkioProfileExampleMoreComplex

import com.leobenkel.zparkio.Services._
import com.leobenkel.zparkio.ZparkioApp
import com.leobenkel.zparkioProfileExampleMoreComplex.Services.SparkBuilder
import com.leobenkel.zparkioProfileExampleMoreComplex.Transformations.UserTransformations
import org.apache.spark.SparkException
import zio.ZIO

trait Application extends ZparkioApp[Arguments, RuntimeEnv.APP_ENV, Unit] {
override def runApp(): ZIO[RuntimeEnv.APP_ENV, Throwable, Unit] = {
for {
_ <- Logger.info(s"--Start--")
authors <- UserTransformations.getAuthors
_ <- Logger.info(s"There are ${authors.count()} authors")
} yield ()
}

override def processErrors(f: Throwable): Option[Int] = {
println(f)
f.printStackTrace(System.out)

f match {
case _: SparkException => Some(10)
case _: InterruptedException => Some(0)
case _ => Some(1)
}
}

override def makeEnvironment(
cliService: Arguments,
sparkService: SparkModule.Service
): RuntimeEnv.APP_ENV = {
RuntimeEnv(cliService, sparkService)
}

override def makeSparkBuilder: SparkModule.Builder[Arguments] = SparkBuilder

override def makeCliBuilder: CommandLineArguments.Builder[Arguments] =
new CommandLineArguments.Builder[Arguments] {
override protected def createCli(args: List[String]): Arguments = {
Arguments(args)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.leobenkel.zparkioProfileExampleMoreComplex

import com.leobenkel.zparkio.Services.CommandLineArguments
import org.rogach.scallop.{ScallopConf, ScallopOption}
import zio.ZIO

case class Arguments(input: List[String])
extends ScallopConf(input) with CommandLineArguments.Service {

val databaseUsername: ScallopOption[String] = opt[String](
default = Some("admin"),
required = false,
noshort = true
)

/*
* Keep in mind that secrets should be provided through a Vault or env variable.
* Do not pass password as command line argument please !
*/
val databasePassword: ScallopOption[String] = opt[String](
default = Some("123456"),
required = false,
noshort = true
)

val databaseHost: ScallopOption[String] = opt[String](
default = Some("database://host.com/database"),
required = false,
noshort = true
)

val generatedInputSize: ScallopOption[Int] = opt[Int](
default = Some(100),
required = false,
noshort = true,
descr = "The size of the sample data generated"
)

val sparkConfig: ScallopOption[String] = opt[String](
default = Some("foo"),
required = false,
noshort = true
)

verify()
}

object Arguments {
def apply[A](f: Arguments => A): ZIO[CommandLineArguments[Arguments], Throwable, A] = {
CommandLineArguments.get[Arguments](f)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.leobenkel.zparkioProfileExampleMoreComplex.Items

case class Post(
postId: Int,
authorId: Int,
title: String,
content: String
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.leobenkel.zparkioProfileExampleMoreComplex.Items

case class User(
userId: Int,
name: String,
age: Int,
active: Boolean
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.leobenkel.zparkioProfileExampleMoreComplex

object Main extends Application {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.leobenkel.zparkioProfileExampleMoreComplex

import com.leobenkel.zparkio.Services._
import com.leobenkel.zparkioProfileExampleMoreComplex.Services.{Database, FileIO}
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console.Console
import zio.random.Random
import zio.system.System
import zio.{ZIO, console}

case class RuntimeEnv(
cliService: Arguments,
sparkService: SparkModule.Service
) extends System.Live with Console.Live with Clock.Live with Random.Live with Blocking.Live
with CommandLineArguments[Arguments] with Logger with FileIO.Live with SparkModule
with Database.Live {

lazy final override val cli: Arguments = cliService
lazy final override val spark: SparkModule.Service = sparkService
lazy final override val log: Logger.Service = new Log()
lazy final override protected val getDatabaseCredentials: Database.Credentials =
Database.Credentials(
cli.databaseUsername(),
cli.databasePassword(),
cli.databaseHost()
)
}

object RuntimeEnv {
type APP_ENV = Any with System with Console with Clock with Random with Blocking
with CommandLineArguments[Arguments] with Logger with FileIO with SparkModule
with Database
}

class Log extends Logger.Service {
override def info(txt: String): ZIO[Any with Console, Nothing, Unit] =
console.putStrLn(s"INFO: $txt")

override def error(txt: String): ZIO[Any with Console, Nothing, Unit] =
console.putStrLn(s"ERROR: $txt")

override def debug(txt: String): ZIO[Any with Console, Nothing, Unit] =
console.putStrLn(s"DEBUG: $txt")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.leobenkel.zparkioProfileExampleMoreComplex.Services

import com.leobenkel.zparkio.Services.SparkModule
import com.leobenkel.zparkio.implicits._
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import zio.{Task, ZIO}

trait Database {
def database: Database.Service
}

object Database {
case class Credentials(
user: String,
psw: String,
host: String
)

trait Service {
final def query[A: Encoder](q: String): ZDS[A] = {
for {
s <- SparkModule()
queryResult <- Task(query(s, q))
} yield {
queryResult
}
}

protected def query[A: Encoder](
spark: SparkSession,
query: String
): Dataset[A]
}

private trait LiveService extends Database.Service {

override protected def query[A: Encoder](
spark: SparkSession,
query: String
): Dataset[A] = {
import spark.implicits._

/**
* This is where:
* {{{
* spark.read.option("","").load()
* }}}
* would go.
*/
Seq[A]().toDS
}

protected def getCredentials: Credentials
}

trait Live extends Database {
protected def getDatabaseCredentials: Credentials

override def database: Database.Service = new Database.LiveService {
override protected def getCredentials: Credentials = getDatabaseCredentials
}
}

def apply[A: Encoder](query: String): ZDS_R[Database, A] = {
ZIO.environment[Database].flatMap(_.database.query(query))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.leobenkel.zparkioProfileExampleMoreComplex.Services

import zio.{Task, ZIO}

import scala.io.Source

trait FileIO {
def fileIO: FileIO.Service
}

object FileIO {
trait Service {
protected def readFileContent(path: String): Seq[String]

final def getFileContent(path: String): ZIO[Any, Throwable, Seq[String]] = {
Task(readFileContent(path))
}
}

private trait LiveService extends FileIO.Service {
override protected def readFileContent(path: String): Seq[String] = {
val file = Source.fromFile(path)
val content = file.getLines().toArray
file.close()
content
}
}

trait Live extends FileIO {
override def fileIO: Service = new LiveService {}
}

def apply(path: String): ZIO[FileIO, Throwable, Seq[String]] = {
ZIO.accessM[FileIO](_.fileIO.getFileContent(path))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.leobenkel.zparkioProfileExampleMoreComplex.Services

import com.leobenkel.zparkio.Services.SparkModule
import com.leobenkel.zparkioProfileExampleMoreComplex.Arguments
import org.apache.spark.sql.SparkSession

object SparkBuilder extends SparkModule.Builder[Arguments] {
override protected def appName: String = "Zparkio_test"

override protected def updateConfig(
sparkBuilder: SparkSession.Builder,
arguments: Arguments
): SparkSession.Builder = {
sparkBuilder
.config("spark.foo.bar", arguments.sparkConfig())
}

override protected def makeSparkService(sparkBuilder: SparkSession.Builder): SparkModule.Service =
new SparkModule.Service {
lazy final override val spark: SparkSession = sparkBuilder.getOrCreate
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.leobenkel.zparkioProfileExampleMoreComplex.Sources

import com.leobenkel.zparkio.Services.SparkModule
import com.leobenkel.zparkio.implicits.ZDS_R
import com.leobenkel.zparkioProfileExampleMoreComplex.Items.{Post, User}
import com.leobenkel.zparkioProfileExampleMoreComplex.Services.Database

object DatabaseSource {

def getUsers: ZDS_R[Database, User] = {
for {
spark <- SparkModule()
users <- {
import spark.implicits._
Database[User]("SELECT * FROM users")
}
} yield {
users
}
}

def getPosts: ZDS_R[Database, Post] = {
for {
spark <- SparkModule()
users <- {
import spark.implicits._
Database[Post]("SELECT * FROM posts")
}
} yield {
users
}
}


}
Loading

0 comments on commit 02a5c7e

Please sign in to comment.