Skip to content

Commit

Permalink
Added support of spark 3.2
Browse files Browse the repository at this point in the history
Summary:
Summary
Fixed all places where we Spark 3.2 contained breaking changes
Added testing for S2 7.6
Fixed flacky OutputMetricsTest

Test Plan: Added new webappio testing jobs

Reviewers: carl, pmishchenko-ua

Reviewed By: pmishchenko-ua

Subscribers: engineering-list

JIRA Issues: PLAT-5937

Differential Revision: https://grizzly.internal.memcompute.com/D53260
  • Loading branch information
AdalbertMemSQL committed Dec 8, 2021
1 parent 6f03694 commit 2cb80b7
Show file tree
Hide file tree
Showing 19 changed files with 727 additions and 164 deletions.
15 changes: 11 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ jobs:
export SINGLESTORE_HOST=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' singlestore-integration)
if [ << parameters.spark_version >> == '3.0.0' ]
then
sbt ++2.12.12 test -Dspark.version=<< parameters.spark_version >>
sbt ++2.12.12 "testOnly -- -l OnlySpark31" -Dspark.version=<< parameters.spark_version >>
else
sbt ++2.11.11 "testOnly -- -l OnlySpark3" -Dspark.version=<< parameters.spark_version >>
sbt ++2.12.12 "testOnly -- -l OnlySpark30" -Dspark.version=<< parameters.spark_version >>
fi
publish:
Expand All @@ -75,6 +75,10 @@ jobs:
name: Publish Spark 3.1.0
command: |
sbt ++2.12.12 -Dspark.version=3.1.0 publish sonatypePrepare sonatypeBundleUpload sonatypeRelease
- run:
name: Publish Spark 3.2.0
command: |
sbt ++2.12.12 -Dspark.version=3.2.0 publish sonatypePrepare sonatypeBundleUpload sonatypeRelease
workflows:
test:
Expand All @@ -90,10 +94,13 @@ workflows:
spark_version:
- 3.0.0
- 3.1.0
- 3.2.0
singlestore_image:
- memsql/cluster-in-a-box:centos-7.0.15-619d118712-1.9.5-1.5.0
- memsql/cluster-in-a-box:centos-6.8.15-029542cbf3-1.9.3-1.4.1
- memsql/cluster-in-a-box:6.7.18-db1caffe94-1.6.1-1.1.1
- memsql/cluster-in-a-box:centos-7.1.13-11ddea2a3a-3.0.0-1.9.3
- memsql/cluster-in-a-box:centos-7.3.2-a364d4b31f-3.0.0-1.9.3
- memsql/cluster-in-a-box:centos-7.5.8-12c73130aa-3.2.11-1.11.11
- memsql/cluster-in-a-box:centos-7.6.5-018454f4e3-4.0.1-1.13.0
publish:
jobs:
- approve-publish:
Expand Down
6 changes: 4 additions & 2 deletions Layerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ SECRET ENV LICENSE_KEY
# increase the memory
MEMORY 4G
MEMORY 8G
MEMORY 12G
MEMORY 16G

# split to 9 states
# split to 16 states
# each of them will run different version of the singlestore and spark
SPLIT 9
SPLIT 16

# copy the entire git repository
COPY . .
Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import xerial.sbt.Sonatype._
To run tests or publish with a specific spark version use this java option:
-Dspark.version=3.0.0
*/
val sparkVersion = sys.props.get("spark.version").getOrElse("3.0.0")
val sparkVersion = sys.props.get("spark.version").getOrElse("3.2.0")
val scalaVersionStr = "2.12.12"
val scalaVersionPrefix = scalaVersionStr.substring(0, 4)

Expand All @@ -18,6 +18,7 @@ lazy val root = project
Compile / unmanagedSourceDirectories += (Compile / sourceDirectory).value / (sparkVersion match {
case "3.0.0" => "scala-sparkv3.0"
case "3.1.0" => "scala-sparkv3.1"
case "3.2.0" => "scala-sparkv3.2"
}),
version := s"3.2.0-spark-${sparkVersion}",
licenses += "Apache-2.0" -> url(
Expand All @@ -33,6 +34,7 @@ lazy val root = project
"org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0",
"org.mariadb.jdbc" % "mariadb-java-client" % "2.+",
"io.spray" %% "spray-json" % "1.3.5",
"io.netty" % "netty-buffer" % "4.1.70.Final",
// test dependencies
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
"org.scalacheck" %% "scalacheck" % "1.14.1" % Test,
Expand Down
23 changes: 16 additions & 7 deletions scripts/define-layerci-matrix.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,44 @@ set -eu

TEST_NUM=${SPLIT:-"0"}

if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '1' ]
if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '1' ] || [ "$TEST_NUM" == '2' ]
then
echo 'export SINGLESTORE_IMAGE="memsql/cluster-in-a-box:centos-7.0.15-619d118712-1.9.5-1.5.0"'
elif [ "$TEST_NUM" == '2' ] || [ "$TEST_NUM" == '3' ]
elif [ "$TEST_NUM" == '3' ] || [ "$TEST_NUM" == '4' ] || [ "$TEST_NUM" == '5' ]
then
echo 'export SINGLESTORE_IMAGE="memsql/cluster-in-a-box:centos-7.1.13-11ddea2a3a-3.0.0-1.9.3"'
echo 'export SINGLESTORE_PASSWORD="password"'
elif [ "$TEST_NUM" == '4' ] || [ "$TEST_NUM" == '5' ]
elif [ "$TEST_NUM" == '6' ] || [ "$TEST_NUM" == '7' ] || [ "$TEST_NUM" == '8' ]
then
echo 'export SINGLESTORE_IMAGE="memsql/cluster-in-a-box:centos-7.3.2-a364d4b31f-3.0.0-1.9.3"'
echo 'export SINGLESTORE_PASSWORD="password"'
else
elif [ "$TEST_NUM" == '9' ] || [ "$TEST_NUM" == '10' ] || [ "$TEST_NUM" == '11' ]
then
echo 'export SINGLESTORE_IMAGE="memsql/cluster-in-a-box:centos-7.5.8-12c73130aa-3.2.11-1.11.11"'
echo 'export SINGLESTORE_PASSWORD="password"'
else
echo 'export SINGLESTORE_IMAGE="memsql/cluster-in-a-box:centos-7.6.5-018454f4e3-4.0.1-1.13.0"'
echo 'export SINGLESTORE_PASSWORD="password"'
fi

if [ "$TEST_NUM" == '9' ]
if [ "$TEST_NUM" == '15' ]
then
echo 'export FORCE_READ_FROM_LEAVES=TRUE'
else
echo 'export FORCE_READ_FROM_LEAVES=FALSE'
fi

if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '2' ] || [ "$TEST_NUM" == '4' ] || [ "$TEST_NUM" == '6' ]
if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '3' ] || [ "$TEST_NUM" == '6' ] || [ "$TEST_NUM" == '9' ] || [ "$TEST_NUM" == '12' ]
then
echo 'export SPARK_VERSION="3.0.0"'
echo 'export TEST_FILTER="testOnly -- -l OnlySpark31"'
else
elif [ "$TEST_NUM" == '1' ] || [ "$TEST_NUM" == '4' ] || [ "$TEST_NUM" == '7' ] || [ "$TEST_NUM" == '10' ] || [ "$TEST_NUM" == '13' ]
then
echo 'export SPARK_VERSION="3.1.0"'
echo 'export TEST_FILTER="testOnly -- -l OnlySpark30"'
else
echo 'export SPARK_VERSION="3.2.0"'
echo 'export TEST_FILTER="testOnly -- -l OnlySpark30"'
fi

echo 'export SCALA_VERSION="2.12.12"'
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import com.singlestore.spark.ExpressionGen.aggregateWithFilter
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.aggregate.{
AggregateFunction,
Average,
First,
Last,
StddevPop,
StddevSamp,
Sum,
VariancePop,
VarianceSamp
}
Expand Down Expand Up @@ -37,6 +39,14 @@ case class VersionSpecificAggregateExpressionExtractor(expressionExtractor: Expr
case Last(expressionExtractor(child), Literal(false, BooleanType)) =>
Some(aggregateWithFilter("ANY_VALUE", child, filter))

// Sum.scala
case Sum(expressionExtractor(child)) =>
Some(aggregateWithFilter("SUM", child, filter))

// Average.scala
case Average(expressionExtractor(child)) =>
Some(aggregateWithFilter("AVG", child, filter))

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
package com.singlestore.spark

import com.singlestore.spark.ExpressionGen._
import com.singlestore.spark.SQLGen.{ExpressionExtractor, IntVar, Joinable, Raw, StringVar, block}
import com.singlestore.spark.SQLGen.{
ExpressionExtractor,
IntVar,
Joinable,
Raw,
StringVar,
block,
cast,
sqlMapValueCaseInsensitive
}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{CalendarIntervalType, DateType, TimestampType}
import org.apache.spark.sql.types.{
BinaryType,
BooleanType,
ByteType,
CalendarIntervalType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
NullType,
ShortType,
StringType,
TimestampType
}
import org.apache.spark.unsafe.types.CalendarInterval

case class VersionSpecificExpressionGen(expressionExtractor: ExpressionExtractor) {
Expand Down Expand Up @@ -53,6 +77,7 @@ case class VersionSpecificExpressionGen(expressionExtractor: ExpressionExtractor
Some(op("/", left, right))
case Remainder(expressionExtractor(left), expressionExtractor(right)) =>
Some(op("%", left, right))
case Abs(expressionExtractor(child)) => Some(f("ABS", child))

case Pmod(expressionExtractor(left), expressionExtractor(right)) =>
Some(block(block(block(left + "%" + right) + "+" + right) + "%" + right))
Expand Down Expand Up @@ -88,6 +113,73 @@ case class VersionSpecificExpressionGen(expressionExtractor: ExpressionExtractor
// TODO PLAT-5759
case Rand(expressionExtractor(child)) => Some(f("RAND", child))

// Cast.scala
case Cast(e @ expressionExtractor(child), dataType, _) => {
dataType match {
case TimestampType => {
e.dataType match {
case _: BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | _: DecimalType =>
Some(cast(f("FROM_UNIXTIME", child), "DATETIME(6)"))
case _ => Some(cast(child, "DATETIME(6)"))
}
}
case DateType => Some(cast(child, "DATE"))

case StringType => Some(cast(child, "CHAR"))
case BinaryType => Some(cast(child, "BINARY"))

case _: BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | _: DecimalType =>
if (e.dataType == DateType) {
Some(StringVar(null))
} else {
val numeric_ch = if (e.dataType == TimestampType) {
f("UNIX_TIMESTAMP", child)
} else {
child
}
dataType match {
case BooleanType => Some(op("!=", numeric_ch, IntVar(0)))
case ByteType => Some(op("!:>", numeric_ch, "TINYINT"))
case ShortType => Some(op("!:>", numeric_ch, "SMALLINT"))
case IntegerType => Some(op("!:>", numeric_ch, "INT"))
case LongType => Some(op("!:>", numeric_ch, "BIGINT"))
case FloatType => Some(op("!:>", numeric_ch, "FLOAT"))
case DoubleType => Some(op("!:>", numeric_ch, "DOUBLE"))
case dt: DecimalType => Some(makeDecimal(numeric_ch, dt.precision, dt.scale))
}
}
// SingleStore doesn't know how to handle this cast, pass it through AS is
case _ => Some(child)
}
}

case NextDay(expressionExtractor(startDate), utf8StringFoldableExtractor(dayOfWeek)) =>
Some(
computeNextDay(
startDate,
sqlMapValueCaseInsensitive(
StringVar(dayOfWeek.toString),
DAYS_OF_WEEK_OFFSET_MAP,
StringVar(null)
)
))

case NextDay(expressionExtractor(startDate), expressionExtractor(dayOfWeek)) =>
Some(
computeNextDay(startDate,
sqlMapValueCaseInsensitive(
dayOfWeek,
DAYS_OF_WEEK_OFFSET_MAP,
StringVar(null)
)))

case Lead(expressionExtractor(input), expressionExtractor(offset), Literal(null, NullType)) =>
Some(f("LEAD", input, offset))
case Lag(expressionExtractor(input), expressionExtractor(offset), Literal(null, NullType)) =>
Some(f("LAG", input, offset))

case _ => None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.singlestore.spark

import org.apache.spark.sql.types.{CalendarIntervalType, DataType}

object VersionSpecificUtil {
def isIntervalType(d: DataType): Boolean =
d.isInstanceOf[CalendarIntervalType]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import com.singlestore.spark.SQLGen.{ExpressionExtractor, SQLGenContext, Stateme
import com.singlestore.spark.ExpressionGen.aggregateWithFilter
import org.apache.spark.sql.catalyst.expressions.aggregate.{
AggregateFunction,
Average,
First,
Last,
StddevPop,
StddevSamp,
Sum,
VariancePop,
VarianceSamp
}
Expand Down Expand Up @@ -35,6 +37,14 @@ case class VersionSpecificAggregateExpressionExtractor(expressionExtractor: Expr
case Last(expressionExtractor(child), false) =>
Some(aggregateWithFilter("ANY_VALUE", child, filter))

// Sum.scala
case Sum(expressionExtractor(child)) =>
Some(aggregateWithFilter("SUM", child, filter))

// Average.scala
case Average(expressionExtractor(child)) =>
Some(aggregateWithFilter("AVG", child, filter))

case _ => None
}
}
Expand Down
Loading

0 comments on commit 2cb80b7

Please sign in to comment.