## 1. Kensu initialisation

To use the library, you need to add the kensu jar to the spark client

Initialize the client with the `properties` file with its `Context`:
- `process_name`:  the application name
- `project_names`: where the application is running 
- `environment`: ... well

In [1]:
%%init_spark
launcher.jars = ["libs/kensu-dam-spark-collector-0.17.2_spark-3.0.1.jar","libs/sdk_2.12.jar"]
launcher.conf.set("spark.sql.shuffle.partitions", "1")

In [2]:
val app = 1
val week = 1

Intitializing Scala interpreter ...

Spark Web UI available at http://jupyter-vidma-2ddodd-2dworkshop-2dqzjhyg5g:4040
SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1643981983817)
SparkSession available as 'spark'


app: Int = 1
week: Int = 1


In [3]:
implicit val ch = new io.kensu.dodd.sdk.ConnectHelper(s"./conf/application${app}-week${week}.properties")
val dataFolder = "data"+ch.properties("dam.activity.user").toString
io.kensu.third.integration.TimeUtils.setMockedTime(ch.properties("mocked.timestamp").asInstanceOf[Long], false)
io.kensu.dam.lineage.spark.lineage.Implicits.SparkSessionDAMWrapper(spark).track(ch.properties.get("dam.ingestion.url").map(_.toString), None)(ch.properties.toList:_*)

maybeDamFileDebugLevel: Some(INFO)
Activating DAM debug logging to file: debug.log
Done activating DAM debug logging to a file
Creating a new DAM scala client actor-system
Done creating a new DAM scala client actor-system.


ch: io.kensu.dodd.sdk.ConnectHelper = io.kensu.dodd.sdk.ConnectHelper@2629dfdf
dataFolder: String = datasomeones-name
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@165419af


 ## 2. Execution of the pipeline - App 1 : Join
 
 This application is joining data sources `customer` and `transaction`


In [4]:
val df_customer = spark.read.option("inferSchema","true").option("header","true").load(s"./data_${ch.properties("dam.activity.user")}/week${week}/custinfo")

df_customer: org.apache.spark.sql.DataFrame = [id: int, first_name: string ... 3 more fields]


In [5]:
val df_transaction = spark.read.option("inferSchema","true").option("header","true").load(s"./data_${ch.properties("dam.activity.user")}/week${week}/transactions")

df_transaction: org.apache.spark.sql.DataFrame = [id: int, date: double ... 3 more fields]


In [6]:
val df_join = df_customer.join(df_transaction, df_customer("id") === df_transaction("id"),"inner").drop(df_transaction("id"))

df_join: org.apache.spark.sql.DataFrame = [id: int, first_name: string ... 7 more fields]


In [7]:
df_join.count()

In [None]:
df_join.write.mode("overwrite").save(s"./data_${ch.properties("dam.activity.user")}/week${week}/joined_data")

## how Lineage is extracted from Spark Plan


The spark plan is obtain by registering a Spark listener, it when notifices Kensu collector when a Spark query is being run.
P.S. Having a plan one could reconstruct a `DataFrame` too.

Below is a very quick overview of main data available in Spark query plans & some debugging available, which are close to where we can get info about the lineage.

In [8]:


df_customer.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)



In [21]:
// this prints the query plans (pased, analyzed, logical, optimized, physical, ...)
println(df_customer.queryExecution.toString)

== Parsed Logical Plan ==
Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet

== Analyzed Logical Plan ==
id: int, first_name: string, last_name: string, email: string, gender: string
Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet

== Optimized Logical Plan ==
Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [id#0,first_name#1,last_name#2,email#3,gender#4] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/jovyan/spark/data_someones-name/week1/custinfo], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,first_name:string,last_name:string,email:string,gender:string>



In [20]:
println(df_join.queryExecution.toString)

== Parsed Logical Plan ==
Project [id#0, first_name#1, last_name#2, email#3, gender#4, date#11, product#12, price#13, quantity#14]
+- Join Inner, (id#0 = id#10)
   :- Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet
   +- Relation[id#10,date#11,product#12,price#13,quantity#14] parquet

== Analyzed Logical Plan ==
id: int, first_name: string, last_name: string, email: string, gender: string, date: double, product: string, price: double, quantity: int
Project [id#0, first_name#1, last_name#2, email#3, gender#4, date#11, product#12, price#13, quantity#14]
+- Join Inner, (id#0 = id#10)
   :- Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet
   +- Relation[id#10,date#11,product#12,price#13,quantity#14] parquet

== Optimized Logical Plan ==
Project [id#0, first_name#1, last_name#2, email#3, gender#4, date#11, product#12, price#13, quantity#14]
+- Join Inner, (id#0 = id#10)
   :- Filter isnotnull(id#0)
   :  +- Relation[id#0,first_name#1,last_name#2,email#3,gend

Spark Plan tree parsing/interpretation
====

Parsing/interpretation of *Spark plan tree* depends on the actual operations used, but in very generic case for each node:

- node.children : which are prerequisites to compute output of this Project operator
- node.output : output attribute ids
- node.references : input attribute ids
- some specific variables of the specific tree node (Join, Project, etc), thse sometimes can be private (meaning we need to use reflection to access the value)

In [30]:
println(df_join.queryExecution.logical)

Project [id#0, first_name#1, last_name#2, email#3, gender#4, date#11, product#12, price#13, quantity#14]
+- Join Inner, (id#0 = id#10)
   :- Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet
   +- Relation[id#10,date#11,product#12,price#13,quantity#14] parquet



In [42]:
// taking root node, Project:
val treeNode = df_join.queryExecution.logical // project
// output attr ids
println(treeNode.output)

List(id#0, first_name#1, last_name#2, email#3, gender#4, date#11, product#12, price#13, quantity#14)


treeNode: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [id#0, first_name#1, last_name#2, email#3, gender#4, date#11, product#12, price#13, quantity#14]
+- Join Inner, (id#0 = id#10)
   :- Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet
   +- Relation[id#10,date#11,product#12,price#13,quantity#14] parquet


In [43]:
// children, which are prerequisites to compute output of this Project operator
println(treeNode.children)

List(Join Inner, (id#0 = id#10)
:- Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet
+- Relation[id#10,date#11,product#12,price#13,quantity#14] parquet
)


In [44]:
println(treeNode.references)

In [48]:
val joinTreeNode = df_customer.join(df_transaction, df_customer("id") === df_transaction("id"),"inner")
  .queryExecution.logical


println(joinTreeNode)
// HERE you can see that join references more attributes than in the output
println("references:")
println(joinTreeNode.references)
println("output:")
println(joinTreeNode.output)

Join Inner, (id#0 = id#10)
:- Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet
+- Relation[id#10,date#11,product#12,price#13,quantity#14] parquet

references:
{id#0, id#10}
output:
List(id#0, first_name#1, last_name#2, email#3, gender#4, id#10, date#11, product#12, price#13, quantity#14)


joinTreeNode: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#10)
:- Relation[id#0,first_name#1,last_name#2,email#3,gender#4] parquet
+- Relation[id#10,date#11,product#12,price#13,quantity#14] parquet
