Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

case class StringLengthCheck(
column: String,
minLength: Option[Json],
maxLength: Option[Json],
threshold: Option[String]
) extends RowBased {
column: String,
minLength: Option[Json],
maxLength: Option[Json],
threshold: Option[String]
) extends RowBased {

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {

val ret = StringLengthCheck(
getVarSub(column, "column", dict),
minLength.map(getVarSubJson(_, "minLength", dict)),
maxLength.map(getVarSubJson(_, "maxLength", dict)),
threshold.map(getVarSub(_, "threshold", dict))
)
getVarSub(column, "column", dict),
minLength.map(getVarSubJson(_, "minLength", dict)),
maxLength.map(getVarSubJson(_, "maxLength", dict)),
threshold.map(getVarSub(_, "threshold", dict))
)
getEvents.foreach(ret.addEvent)
ret
}

private def cmpExpr(colExpr: Expression,
value: Option[Json],
cmp: (Expression, Expression) => Expression
): Option[Expression] = {
value: Option[Json],
cmp: (Expression, Expression) => Expression
): Option[Expression] = {
value.map { v => cmp(colExpr, createLiteralOrUnresolvedAttribute(IntegerType, v)) }
}

Expand All @@ -57,20 +57,20 @@ case class StringLengthCheck(
private def checkMinLessThanOrEqualToMax(values: List[Json]): Unit = {

if (values.forall(_.isNumber)) {
values.flatMap(_.asNumber) match {
case mv :: xv :: Nil if mv.toDouble > xv.toDouble =>
addEvent(ValidatorError(s"min: ${minLength.get} must be less than or equal to max: ${maxLength.get}"))
case _ =>
}
values.flatMap(_.asNumber) match {
case mv :: xv :: Nil if mv.toDouble > xv.toDouble =>
addEvent(ValidatorError(s"min: ${minLength.get} must be less than or equal to max: ${maxLength.get}"))
case _ =>
}
} else if (values.forall(_.isString)) {
values.flatMap(_.asString) match {
case mv :: xv :: Nil if mv == xv =>
addEvent(ValidatorError(s"Min[String]: $mv must be less than max[String]: $xv"))
case _ =>
}
values.flatMap(_.asString) match {
case mv :: xv :: Nil if mv == xv =>
addEvent(ValidatorError(s"Min[String]: $mv must be less than max[String]: $xv"))
case _ =>
}
} else {
// Not Strings or Numbers
addEvent(ValidatorError(s"Unsupported type in ${values.map(debugJson).mkString(", ")}"))
// Not Strings or Numbers
addEvent(ValidatorError(s"Unsupported type in ${values.map(debugJson).mkString(", ")}"))
}
}

Expand All @@ -89,7 +89,7 @@ case class StringLengthCheck(
val colType = findColumnInDataFrame(df, column)
if (colType.isDefined) {
val dataType = colType.get.dataType
if (!(dataType.isInstanceOf[StringType])) {
if (!dataType.isInstanceOf[StringType]) {
addEvent(ValidatorError(s"Data type of column '$column' must be String, but was found to be $dataType"))
}
}
Expand Down Expand Up @@ -123,8 +123,6 @@ object StringLengthCheck extends LazyLogging {
logger.debug(s"minLength: $minLengthJ type: ${minLengthJ.getClass.getCanonicalName}")
logger.debug(s"maxLength: $maxLengthJ type: ${maxLengthJ.getClass.getCanonicalName}")
logger.debug(s"threshold: $threshold type: ${threshold.getClass.getCanonicalName}")

c.focus.foreach {f => logger.info(s"StringLengthCheckJson: ${f.spaces2}")}
scala.util.Right(StringLengthCheck(column, minLengthJ, maxLengthJ, threshold))
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.target.data_validator.validator

import com.target.data_validator.{JsonEncoders, ValidatorError, VarSubstitution}
import com.target.data_validator.JsonUtils.debugJson
import com.target.data_validator.validator.ValidatorBase._
import com.typesafe.scalalogging.LazyLogging
import io.circe.{DecodingFailure, HCursor, Json}
Expand All @@ -12,10 +11,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructType}

case class StringRegexCheck(
column: String,
regex: Option[Json],
threshold: Option[String]
) extends RowBased {
column: String,
regex: Option[Json],
threshold: Option[String]
) extends RowBased {

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {

Expand All @@ -35,13 +34,13 @@ case class StringRegexCheck(
val regexExpression = regex.map { r => RLike(colExp, createLiteralOrUnresolvedAttribute(StringType, r)) }

val ret = regexExpression match {
/*
RLike returns false if the column value is null.
To avoid counting null values as validation failures (like other validations),
an explicit non null check on the column value is required.
*/
case Some(x) => And(Not(x), IsNotNull(colExp))
case _ => throw new RuntimeException("Must define a regex.")
/*
RLike returns false if the column value is null.
To avoid counting null values as validation failures (like other validations),
an explicit non null check on the column value is required.
*/
case Some(x) => And(Not(x), IsNotNull(colExp))
case _ => throw new RuntimeException("Must define a regex.")
}
logger.debug(s"Expr: $ret")
ret
Expand All @@ -59,7 +58,7 @@ case class StringRegexCheck(
val colType = findColumnInDataFrame(df, column)
if (colType.isDefined) {
val dataType = colType.get.dataType
if (!(dataType.isInstanceOf[StringType])) {
if (!dataType.isInstanceOf[StringType]) {
addEvent(ValidatorError(s"Data type of column '$column' must be String, but was found to be $dataType"))
}
}
Expand Down Expand Up @@ -90,8 +89,6 @@ object StringRegexCheck extends LazyLogging {
logger.debug(s"column: $column")
logger.debug(s"regex: $regex type: ${regex.getClass.getCanonicalName}")
logger.debug(s"threshold: $threshold type: ${threshold.getClass.getCanonicalName}")

c.focus.foreach {f => logger.info(s"StringRegexCheckJson: ${f.spaces2}")}
scala.util.Right(StringRegexCheck(column, regex, threshold))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class StringLengthCheckSpec extends FunSpec with Matchers with TestingSparkSessi
assert(config.quickChecks(spark, dict))
assert(sut.failed)
assert(sut.getEvents contains
ValidatorCheckEvent(failure = true, "StringLengthCheck on column 'item'", 4, 2))
ValidatorCheckEvent(failure = true, "StringLengthCheck on column 'item'", 4, 2)) // scalastyle:ignore

assert(sut.getEvents contains
ValidatorQuickCheckError(("item", "Item1") :: Nil, "Item1",
Expand Down Expand Up @@ -428,4 +428,4 @@ class StringLengthCheckSpec extends FunSpec with Matchers with TestingSparkSessi
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.target.data_validator.validator
import com.target.TestingSparkSession
import com.target.data_validator._
import io.circe.Json
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
Expand All @@ -22,8 +22,8 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio
Row("Item1", 2.99),
Row("Item23", 5.35),
Row("I", 1.00),
Row(null, 1.00),
Row(null, 2.00)
Row(null, 1.00), // scalastyle:ignore
Row(null, 2.00) // scalastyle:ignore
)

describe("StringRegexCheck") {
Expand All @@ -40,7 +40,7 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio

it("error if column is not found in df") {
val df = mkDataFrame(spark, defData, schema)
val sut = StringRegexCheck( "bad_column_name", Some(Json.fromString("I%")), None)
val sut = StringRegexCheck("bad_column_name", Some(Json.fromString("I%")), None)
assert(sut.configCheck(df))
assert(sut.getEvents contains ValidatorError("Column: 'bad_column_name' not found in schema."))
assert(sut.failed)
Expand All @@ -50,7 +50,10 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio
val df = mkDataFrame(spark, defData, schema)
val sut = StringRegexCheck("baseprice", Some(Json.fromString("I%")), None)
assert(sut.configCheck(df))
assert(sut.getEvents contains ValidatorError("Data type of column 'baseprice' must be String, but was found to be DoubleType"))
assert(
sut.getEvents contains
ValidatorError("Data type of column 'baseprice' must be String, but was found to be DoubleType")
)
assert(sut.failed)
}
}
Expand All @@ -71,7 +74,7 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio
}

it("substitute with threshold") {
val dict = mkParams(List(("column", "item"), ("regex", "I%"), ("threshold", Json.fromInt(100))))
val dict = mkParams(List(("column", "item"), ("regex", "I%"), ("threshold", Json.fromInt(100)))) // scalastyle:ignore
val sut = StringRegexCheck("$column", Some(Json.fromString("${regex}")), Some("${threshold}"))
assert(sut.substituteVariables(dict) == StringRegexCheck("item", Some(Json.fromString("I%")), Some("100")))
assert(!sut.failed)
Expand All @@ -82,8 +85,10 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio

it("regex pattern ab%") {
val sut = StringRegexCheck("item", Some(Json.fromString("ab%")), None)
assert(sut.colTest(schema, mkParams()).sql ==
And(Not(RLike(UnresolvedAttribute("item"), Literal.create("ab%", StringType))), IsNotNull(UnresolvedAttribute("item"))).sql)
assert(sut.colTest(schema, mkParams()).sql == And(
Not(RLike(UnresolvedAttribute("item"), Literal.create("ab%", StringType))),
IsNotNull(UnresolvedAttribute("item"))).sql
)
}
}

Expand Down