### Write Monad for collect logs in spark

First We're going to import the libraries that we're going to need

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.3` 
import $ivy.`org.apache.spark::spark-core:2.4.3` 
import $ivy.`org.typelevel::cats-core:2.3.0` 

[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                [39m

We're going to compare the shema from a dataframe and from a List of columns provide. We want to check that all the columns in the list are in the dataframe. And for example we're going to check that at least one of the columns is mandatory (is only for show how you can use more than one Writer together).

We define two case classes that we need for our validation method that we're going to use for validate the schema from a dataframe.

In [2]:
case class SchemaColumn(
                         name: String,
                         mandatory: Boolean = true
                       )
case class ValidationLog (
                              log: String,
                              validation: Boolean
)

defined [32mclass[39m [36mSchemaColumn[39m
defined [32mclass[39m [36mValidationLog[39m

In [3]:
import org.apache.spark.sql.DataFrame 

[32mimport [39m[36morg.apache.spark.sql.DataFrame [39m

First we're going to see how can we do it without FP, with side effects and mutable variables:

In [4]:
def validation(df: DataFrame, schema: List[SchemaColumn]): Boolean = {
    var message = ""
    val checkColumnsNames = df.schema.names.map(_.toUpperCase).forall { name =>
      val check = schema.map(_.name.toUpperCase).contains(name)
      if(!check) message = message + s" Column: $name not in the registration. "
      check
    }
    val checkMandatory = schema.exists(_.mandatory.equals(true))
      if(!checkMandatory) {message = message + " No mandatory Columns. "}
    
      println(message)
      if(checkColumnsNames && checkMandatory) true else false
  }

defined [32mfunction[39m [36mvalidation[39m

Here we go, we're going to test it:

In [6]:
import org.apache.spark.sql._
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Getting spark JARs


21/05/19 15:21:29 INFO Server: jetty-9.4.30.v20200611; built: 2020-06-11T12:34:51.929Z; git: 271836e4c1f4612f12b7bb13ef5a92a927634b0d; jvm 1.8.0_265-8u265-b01-0ubuntu2~20.04-b01
21/05/19 15:21:29 INFO AbstractConnector: Started ServerConnector@54efe60d{HTTP/1.1, (http/1.1)}{172.17.0.2:40757}
21/05/19 15:21:29 INFO Server: Started @2805179ms


Creating SparkSession


21/05/19 15:21:29 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/jovyan/notebooks/spark-warehouse').
21/05/19 15:21:29 INFO SharedState: Warehouse path is 'file:/home/jovyan/notebooks/spark-warehouse'.
21/05/19 15:21:29 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/05/19 15:21:29 WARN NotebookSparkSessionBuilder: Using an existing SparkSession; some configuration may not take effect.


[32mimport [39m[36morg.apache.spark.sql._
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@60ba48d4

In [11]:
import spark.implicits._
val someDF = Seq(
  (8, "bat"),
  (64, "mouse"),
  (-27, "horse")
).toDF("number", "word")

val schemaColumns = List(SchemaColumn("column_that_fail",false), SchemaColumn("word", false))

[32mimport [39m[36mspark.implicits._
[39m
[36msomeDF[39m: [32mDataFrame[39m = [number: int, word: string]
[36mschemaColumns[39m: [32mList[39m[[32mSchemaColumn[39m] = [33mList[39m(
  [33mSchemaColumn[39m([32m"column_that_fail"[39m, false),
  [33mSchemaColumn[39m([32m"word"[39m, false)
)

In [12]:
val valid = validation(someDF, schemaColumns)
println(valid)

 Column: NUMBER not in the registration.  No mandatory Columns. 
false


[36mvalid[39m: [32mBoolean[39m = false

We can see how the validation failed and we print the number column because is not in the Schema and that we don't have mandatory columns

I know that is more pretty is you use a BufferList or something similar.

In [13]:
import cats.data._
import cats.implicits.catsKernelStdMonoidForString
import cats.instances._

[32mimport [39m[36mcats.data._
[39m
[32mimport [39m[36mcats.implicits.catsKernelStdMonoidForString
[39m
[32mimport [39m[36mcats.instances._[39m

Now, we are going to do something similar but with the Monad Writer from the cats library. Without side effect or mutable variables (good for parallel and concurrent computation).

In [14]:
  def validationF(df: DataFrame, schema: List[SchemaColumn]): ValidationLog = {
    val checkColumns = df.schema.names.map(_.toUpperCase).foldLeft(Writer("",true)){ (wr, name) =>
      schema.map(_.name.toUpperCase).contains(name) match {
        case true => wr
        case false => wr.tell(s" Column: $name not in the registration. ").map(_ => false)
      }
    }

    val checkMandatory = (schema.exists(_.mandatory.equals(true)) match {
      case true => Writer("",true)
      case false => Writer("No mandatory Columns in the registration",false)
    })

    val check = (for {
      columns <- checkColumns
      mandatory  <- checkMandatory
    } yield if(columns && mandatory) true else false).run

    ValidationLog tupled check
  }

defined [32mfunction[39m [36mvalidationF[39m

In [15]:
val valid = validationF(someDF, schemaColumns)

[36mvalid[39m: [32mValidationLog[39m = [33mValidationLog[39m(
  [32m" Column: NUMBER not in the registration. No mandatory Columns in the registration"[39m,
  false
)

Then, you have encapsulate the log from the method in the Left side of the Writer monad and in the right side you will have the booelan value

In [16]:
println(valid.log)

 Column: NUMBER not in the registration. No mandatory Columns in the registration
