Skip to content

Commit

Permalink
fixed the expecting the map input for dataproc jobs in airflow!
Browse files Browse the repository at this point in the history
  • Loading branch information
feroshjacob committed Oct 9, 2018
1 parent 6605627 commit 720f870
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
3 changes: 2 additions & 1 deletion src/main/resources/templates/run-pyspark.ssp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<%@ var localVariables:Map[String, com.recipegrace.bbc.grmr.Expressions.Expression] %>
<%@ var evalObject: com.recipegrace.bbc.codegen.ExpressionCreator%>
<%@ val sparkProperties:String= evalObject.evaluateVariable(sparkJob.sparkJob.props,localVariables).toString%>
<%@ val sparkPropertiesCalculated:List[(String,String)] = if(sparkProperties.isEmpty || !sparkProperties.contains("==")) List() else sparkProperties.split(",").flatMap(f=> f.split("==")).grouped(2).map(f=> f.head ->f(1)).toList%>
<%@ val hasArguments:Boolean = sparkJob.sparkJob.args.nonEmpty%>
<%@ var name: String%>

Expand All @@ -13,6 +14,6 @@
job_name='submit_spark_${sparkJob.name}',
pyfiles=[${ sparkJob.sparkJob.otherPyFiles.map(f=> "'"+evalObject.evaluateVariable(f,localVariables )+"'").mkString(",")}],
#if(hasArguments)arguments=[${sparkJob.sparkJob.args.get.map(f=> "'"+evalObject.evaluateVariable(f,localVariables )+"'").mkString(",")}],
#end#if(sparkProperties.nonEmpty)dataproc_spark_properties='${sparkProperties}',
#end#if(sparkProperties.nonEmpty)dataproc_spark_properties={${sparkPropertiesCalculated.map(f=> "'"+ f._1 +"':'"+f._2 +"'").mkString(", ")}},
#endcluster_name='${clusterName}'.lower()
)
3 changes: 2 additions & 1 deletion src/main/resources/templates/run-spark.ssp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<%@ var localVariables:Map[String, com.recipegrace.bbc.grmr.Expressions.Expression] %>
<%@ var evalObject: com.recipegrace.bbc.codegen.ExpressionCreator%>
<%@ val sparkProperties:String= evalObject.evaluateVariable(sparkJob.sparkJob.props,localVariables).toString%>
<%@ val sparkPropertiesCalculated:List[(String,String)] = if(sparkProperties.isEmpty || !sparkProperties.contains("==")) List() else sparkProperties.split(",").flatMap(f=> f.split("==")).grouped(2).map(f=> f.head ->f(1)).toList%>
<%@ val hasArguments:Boolean = sparkJob.sparkJob.args.nonEmpty%>
<%@ var name: String%>
<%@ val jarLocationCalculated: String = sparkJob.sparkJob.repository match {
Expand All @@ -20,6 +21,6 @@ case _ => evalObject.evaluateVariable(sparkJob.sparkJob.jarLocation.get, localVa
main_class='${sparkJob.sparkJob.mainClass}',
job_name='submit_spark_${sparkJob.name}',
#if(hasArguments)arguments=[${ sparkJob.sparkJob.args.get.map(f=> "'"+evalObject.evaluateVariable(f,localVariables )+"'").mkString(",")}],
#end#if(sparkProperties.nonEmpty)dataproc_spark_properties='${sparkProperties}',
#end#if(sparkProperties.nonEmpty)dataproc_spark_properties={${sparkPropertiesCalculated.map(f=> "'"+ f._1 +"':'"+f._2 +"'").mkString(", ")}},
#endcluster_name='${clusterName}'.lower()
)
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TemplateTest extends BaseBBCGrammarTest with ExpressionCreator{
val localVariables = Map(): Map[String, Expression]
object evalObject extends ExpressionCreator
val mainPyFile = "samplel.py"
val sparkProps = "-x.ua y.ub"
val sparkProps = "ua==BX,ub==AX"
val programArguments = Array("arg1", "arg2")
val otherPyFiles = Array("sample1.py","sample2.py")
val jobName = "Zjob"
Expand Down Expand Up @@ -110,7 +110,7 @@ class TemplateTest extends BaseBBCGrammarTest with ExpressionCreator{
| job_name='submit_spark_${job._2.name}',
| pyfiles=[${otherPyFiles.map(f => "'" + f + "'").mkString(",")}],
| arguments=[${programArguments.map(f => "'" + f + "'").mkString(",")}],
| dataproc_spark_properties='${sparkProps}',
| dataproc_spark_properties={'ua':'BX', 'ub':'AX'},
| cluster_name='${clusterName}'.lower()
)""".stripMargin
}
Expand All @@ -120,7 +120,7 @@ class TemplateTest extends BaseBBCGrammarTest with ExpressionCreator{
val localVariables = Map():Map[String, Expression]
object evalObject extends ExpressionCreator
val className = "com.x.y.Z"
val sparkProps = "-x.ua y.ub"
val sparkProps = "a==b,b==c"
val programArguments = Array("arg1", "arg2")
val jarURI = "jar link"
val jobName = "Zjob"
Expand All @@ -147,7 +147,7 @@ class TemplateTest extends BaseBBCGrammarTest with ExpressionCreator{
| main_class='${job._2.sparkJob.mainClass}',
| job_name='submit_spark_${job._2.name}',
| arguments=[${programArguments.map(f=>"'"+f+"'").mkString(",")}],
| dataproc_spark_properties='${sparkProps}',
| dataproc_spark_properties={'a':'b', 'b':'c'},
| cluster_name='${clusterName}'.lower()
)""".stripMargin

Expand Down

0 comments on commit 720f870

Please sign in to comment.