Skip to content

Commit

Permalink
Support SQL in views folder
Browse files Browse the repository at this point in the history
  • Loading branch information
hayssams committed Dec 7, 2021
1 parent d202a62 commit 9a7b0f2
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 59 deletions.
45 changes: 28 additions & 17 deletions src/main/scala/ai/starlake/job/transform/AutoTaskJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@

package ai.starlake.job.transform

import ai.starlake.job.index.bqload.{BigQueryJobResult, BigQueryLoadConfig}
import ai.starlake.schema.handlers.{SchemaHandler, StorageHandler}
import ai.starlake.schema.model.Stage.UNIT
import ai.starlake.schema.model.{AutoTaskDesc, BigQuerySink, Stage, Views}
import ai.starlake.config.{Settings, StorageArea}
import ai.starlake.job.index.bqload.{BigQueryJobResult, BigQueryLoadConfig, BigQueryNativeJob}
import ai.starlake.job.ingest.{AuditLog, Step}
import ai.starlake.job.metrics.AssertionJob
import ai.starlake.schema.handlers.{SchemaHandler, StorageHandler}
import ai.starlake.schema.model.Stage.UNIT
import ai.starlake.schema.model._
import ai.starlake.utils.Formatter._
import ai.starlake.utils.{JobResult, SparkJob, SparkJobResult, Utils}
import ai.starlake.utils._
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode

Expand Down Expand Up @@ -128,18 +125,30 @@ case class AutoTaskJob(
(queryName, viewValue)
}

val subSelects = withViews.map { case (queryName, queryExpr) =>
val selectExpr =
if (queryExpr.toLowerCase.startsWith("select "))
queryExpr
else {
val allColumns = "*"
s"SELECT $allColumns FROM $queryExpr"
}
queryName + " AS (" + selectExpr + ")"
val mainTaskSQL =
CommentParser.stripComments(task.getSql().richFormat(sqlParameters).trim) match {
case Right(s) => s
case Left(error) =>
throw new Exception(
s"ERROR: Could not strip comments from SQL Request ${task.getSql()}\n $error"
)
}
val sql = if (mainTaskSQL.toLowerCase().startsWith("with ")) {
mainTaskSQL
} else {
val subSelects = withViews.map { case (queryName, queryExpr) =>
val selectExpr =
if (queryExpr.toLowerCase.startsWith("select "))
queryExpr
else {
val allColumns = "*"
s"SELECT $allColumns FROM $queryExpr"
}
queryName + " AS (" + selectExpr + ")"
}
val subSelectsString = if (subSelects.nonEmpty) subSelects.mkString("WITH ", ",", " ") else ""
"(" + subSelectsString + mainTaskSQL + ")"
}
val subSelectsString = if (subSelects.nonEmpty) subSelects.mkString("WITH ", ",", " ") else ""
val sql = subSelectsString + task.getSql().richFormat(sqlParameters)
val preSql = task.presql.getOrElse(Nil).map { sql => sql.richFormat(sqlParameters) }
val postSql = task.postsql.getOrElse(Nil).map { sql => sql.richFormat(sqlParameters) }
(preSql, sql, postSql)
Expand All @@ -149,7 +158,9 @@ case class AutoTaskJob(
val start = Timestamp.from(Instant.now())
val config = createConfig()
val (preSql, mainSql, postSql) = buildQueryBQ()
def bqNativeJob(sql: String) = new BigQueryNativeJob(config, sql, udf)

// We add extra parenthesis required dby BQ when using "WITH" keyword
def bqNativeJob(sql: String) = new BigQueryNativeJob(config, "(" + sql + ")", udf)

val presqlResult: Try[Iterable[BigQueryJobResult]] = Try {
preSql.map { sql =>
Expand Down
40 changes: 35 additions & 5 deletions src/main/scala/ai/starlake/schema/handlers/SchemaHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class SchemaHandler(storage: StorageHandler)(implicit settings: Settings) extend
defaultAssertions ++ assertions ++ resAssertions
}

def loadViews(path: String): Views = {
@deprecated("Views are directly stored in sql files", "0.2.8")
private def loadViews(path: String): Views = {
val viewsPath = DatasetArea.views(path)
if (storage.exists(viewsPath)) {
val rootNode = mapper.readTree(storage.read(viewsPath))
Expand All @@ -127,10 +128,39 @@ class SchemaHandler(storage: StorageHandler)(implicit settings: Settings) extend
}
}

def views(name: String): Views =
Views.merge(
("default.comet.yml" :: "views.comet.yml" :: (name + ".comet.yml") :: Nil).map(loadViews)
)
private def loadSqlFile(sqlFile: Path, rootLen: Int = 0): (String, String) = {
val sqlExpr = storage.read(sqlFile)
val sqlFilename =
sqlFile.toString.dropRight(3).substring(rootLen + 1).replaceAll("/", ".")
sqlFilename -> sqlExpr
}

private def loadSqlFiles(path: Path): Map[String, String] = {
val sqlFiles = storage.list(path, extension = ".sql", recursive = true)
val rootLen = path.toString.length
sqlFiles.map { sqlFile =>
loadSqlFile(sqlFile, rootLen)
}.toMap
}

private def loadSqlViews(viewsPath: Path): Map[String, String] = {
val sqlViews = loadSqlFiles(viewsPath)
sqlViews.foreach { case (viewName, _) =>
val standardPrefixes = Set("CTE", "TBL", "TABLE", "VIEW")
val sqlName = viewName.substring(viewName.lastIndexOf('.') + 1)
val isStandardViewName =
standardPrefixes.exists(prefix => sqlName.toUpperCase().startsWith(prefix + "_"))
if (!isStandardViewName)
logger.warn(s"$viewName does not start with one of $standardPrefixes")
}
sqlViews
}

def views(viewName: String): Views = {
val viewsPath = DatasetArea.views(viewName)
val viewMap = loadSqlViews(viewsPath)
Views(viewMap)
}

@throws[Exception]
lazy val activeEnv: Map[String, String] = {
Expand Down
23 changes: 23 additions & 0 deletions src/main/scala/ai/starlake/utils/CommentParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ai.starlake.utils

import scala.util.parsing.combinator.JavaTokenParsers

// From https://users.scala-lang.org/t/solved-parser-combinator-removing-comments/6635
object CommentParser extends JavaTokenParsers {

def singleLine: Parser[String] = "//.*".r ^^ (_ => "")
def multiLine: Parser[String] = """/\*.*\*/""".r ^^^ ""
def comments: Parser[Seq[String]] = (singleLine | multiLine).*
def commentedText: Parser[String] = comments ~> "[^\\/*]*".r <~ comments
def empty: Parser[Seq[String]] = ".*$".r ^^ { e => Seq(e) }
def expression: Parser[String] = commentedText ~ (empty | commentedText.*) ^^ {
case (a: String) ~ (b: Seq[String]) => a + b.mkString("")
}

def stripComments(str: String): Either[String, String] = {
parseAll(expression, str) match {
case Success(result, _) => Right(result)
case failedOrIncomplete => Left(failedOrIncomplete.toString)
}
}
}
3 changes: 1 addition & 2 deletions src/test/resources/job.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* comet_presql */
select count(*) from PRP.LA_BONNE_ADRESSE
where x = '${actor}'
(((select * from `starlake-325712`.hr.locations)))

/* comet_sql */

Expand Down
35 changes: 0 additions & 35 deletions src/test/scala/ai/starlake/job/gcp/BigQueryNativeJobTest.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package ai.starlake.job.index.bqload

import ai.starlake.TestHelper
import ai.starlake.config.Settings
import ai.starlake.schema.handlers.{SchemaHandler, SimpleLauncher}
import ai.starlake.schema.model._
import ai.starlake.workflow.{IngestionWorkflow, TransformConfig}
import com.google.cloud.bigquery.TableId
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll

class BigQueryNativeJobSpec extends TestHelper with BeforeAndAfterAll {
override def beforeAll(): Unit = {
BigQueryJobBase.bigquery.delete(TableId.of("bqtest", "account"))
}
override def afterAll(): Unit = {
BigQueryJobBase.bigquery.delete(TableId.of("bqtest", "account"))
}

new WithSettings() {
"Ingest to BigQuery" should "should be ingest and store table in BigQuery" in {
if (sys.env.getOrElse("COMET_GCP_TEST", "false").toBoolean) {
import org.slf4j.impl.StaticLoggerBinder
val binder = StaticLoggerBinder.getSingleton
logger.debug(binder.getLoggerFactory.toString)
logger.debug(binder.getLoggerFactoryClassStr)

new WithSettings() {
new SpecTrait(
domainOrJobFilename = "bqtest.comet.yml",
sourceDomainOrJobPathname = "/sample/position/bqtest.comet.yml",
datasetDomainName = "bqtest",
sourceDatasetPathName = "/sample/position/XPOSTBL"
) {
cleanMetadata
cleanDatasets

logger.info(settings.comet.datasets)
loadPending
}
}
val tableFound =
Option(BigQueryJobBase.bigquery.getTable(TableId.of("bqtest", "account"))).isDefined
tableFound should be(true)

}
}
"Native BigQuery AutoJob" should "succeed" in {
val businessTask1 = AutoTaskDesc(
None,
Some("select * from bqtest.account"),
"bqtest",
"jobresult",
WriteMode.OVERWRITE,
sink = Some(BigQuerySink(name = Some("sinktest"), location = Some("EU"))),
engine = Some(Engine.BQ)
)
val businessJob =
AutoJobDesc("user", List(businessTask1), None, None, None, engine = Some(Engine.BQ))
val schemaHandler = new SchemaHandler(metadataStorageHandler)

val businessJobDef = mapper
.writer()
.withAttribute(classOf[Settings], settings)
.writeValueAsString(businessJob)
lazy val pathBusiness = new Path(cometMetadataPath + "/jobs/bqjobtest.comet.yml")

val workflow =
new IngestionWorkflow(storageHandler, schemaHandler, new SimpleLauncher())
storageHandler.write(businessJobDef, pathBusiness)
workflow.autoJob(TransformConfig("bqjobtest")) should be(true)
}
}
}
34 changes: 34 additions & 0 deletions src/test/scala/ai/starlake/utils/CommentParserSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ai.starlake.utils

import com.typesafe.scalalogging.StrictLogging
import org.apache.spark.sql.DatasetLogging
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class CommentParserSpec
extends AnyFlatSpec
with Matchers
with BeforeAndAfterAll
with StrictLogging
with DatasetLogging {
"Taxonomy" should "list files by modification_time and name" in {
val r1 = CommentParser.stripComments("/* a comment */")
Right("") should equal(r1)

val r2 = CommentParser.stripComments("// a comment")
Right("") should equal(r2)

val r3 = CommentParser.stripComments("/* level1 /* level 2 */ */")
Right("") should equal(r3)

val r4 = CommentParser.stripComments("Text Before./* level1 /* level 2 */ */")
Right("Text Before.") should equal(r4)

val r5 = CommentParser.stripComments("/* level1 /* level 2 */ */Text after.")
Right("Text after.") should equal(r5)

val r6 = CommentParser.stripComments("Text Before./* level1 /* level 2 */ */Text after.")
Right("Text Before.Text after.") should equal(r6)
}
}

0 comments on commit 9a7b0f2

Please sign in to comment.