Skip to content

Commit

Permalink
Added support of Spark 3.4
Browse files Browse the repository at this point in the history
Summary:
Updated all Spark versions.
Added SingleStore 8.1 to testing

Test Plan: https://webapp.io/memsql/commits?query=repo%3Asinglestore-spark-connector+id%3A410

Reviewers: pmishchenko-ua, vtkachuk-ua

Reviewed By: pmishchenko-ua

Subscribers: engineering-list

JIRA Issues: PLAT-6642

Differential Revision: https://grizzly.internal.memcompute.com/D63058
  • Loading branch information
AdalbertMemSQL committed Jun 22, 2023
1 parent 1e82f0f commit 6c65288
Show file tree
Hide file tree
Showing 17 changed files with 585 additions and 56 deletions.
35 changes: 22 additions & 13 deletions .circleci/config.yml
Expand Up @@ -49,17 +49,20 @@ jobs:
name: Run tests
command: |
export SINGLESTORE_HOST=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' singlestore-integration)
if [ << parameters.spark_version >> == '3.0.0' ]
if [ << parameters.spark_version >> == '3.0.3' ]
then
sbt ++2.12.12 "testOnly -- -l ExcludeFromSpark30" -Dspark.version=<< parameters.spark_version >>
elif [ << parameters.spark_version >> == '3.1.3' ]
then
sbt ++2.12.12 "testOnly -- -l ExcludeFromSpark31" -Dspark.version=<< parameters.spark_version >>
elif [ << parameters.spark_version >> == '3.2.1' ]
elif [ << parameters.spark_version >> == '3.2.4' ]
then
sbt ++2.12.12 "testOnly -- -l ExcludeFromSpark32" -Dspark.version=<< parameters.spark_version >>
else
elif [ << parameters.spark_version >> == '3.3.2' ]
then
sbt ++2.12.12 "testOnly -- -l ExcludeFromSpark33" -Dspark.version=<< parameters.spark_version >>
else
sbt ++2.12.12 "testOnly -- -l ExcludeFromSpark34" -Dspark.version=<< parameters.spark_version >>
fi
publish:
Expand All @@ -76,21 +79,25 @@ jobs:
openssl enc -d -aes-256-cbc -K ${ENCRYPTION_KEY} -iv ${ENCRYPTION_IV} -in ci/secring.asc.enc -out ci/secring.asc
gpg --import ci/secring.asc
- run:
name: Publish Spark 3.0.0
name: Publish Spark 3.0.3
command: |
sbt ++2.12.12 -Dspark.version=3.0.0 clean publishSigned sonatypeBundleRelease
sbt ++2.12.12 -Dspark.version=3.0.3 clean publishSigned sonatypeBundleRelease
- run:
name: Publish Spark 3.1.3
command: |
sbt ++2.12.12 -Dspark.version=3.1.3 clean publishSigned sonatypeBundleRelease
- run:
name: Publish Spark 3.2.1
name: Publish Spark 3.2.4
command: |
sbt ++2.12.12 -Dspark.version=3.2.4 clean publishSigned sonatypeBundleRelease
- run:
name: Publish Spark 3.3.2
command: |
sbt ++2.12.12 -Dspark.version=3.2.1 clean publishSigned sonatypeBundleRelease
sbt ++2.12.12 -Dspark.version=3.3.2 clean publishSigned sonatypeBundleRelease
- run:
name: Publish Spark 3.3.0
name: Publish Spark 3.4.0
command: |
sbt ++2.12.12 -Dspark.version=3.3.0 clean publishSigned sonatypeBundleRelease
sbt ++2.12.12 -Dspark.version=3.4.0 clean publishSigned sonatypeBundleRelease
workflows:
test:
Expand All @@ -104,15 +111,17 @@ workflows:
matrix:
parameters:
spark_version:
- 3.0.0
- 3.0.3
- 3.1.3
- 3.2.1
- 3.3.0
- 3.2.4
- 3.3.2
- 3.4.0
singlestore_image:
- singlestore/cluster-in-a-box:centos-7.5.12-3112a491c2-4.0.0-1.12.5
- singlestore/cluster-in-a-box:alma-7.6.14-6f67cb4355-4.0.4-1.13.6
- singlestore/cluster-in-a-box:alma-7.8.19-4263b2d130-4.0.10-1.14.4
- singlestore/cluster-in-a-box:alma-8.0.15-0b9b66384f-4.0.11-1.15.2
- singlestore/cluster-in-a-box:alma-8.0.19-f48780d261-4.0.11-1.16.0
- singlestore/cluster-in-a-box:alma-8.1.6-74913b66cc-4.0.11-1.16.0
publish:
jobs:
- approve-publish:
Expand Down
4 changes: 2 additions & 2 deletions Layerfile
Expand Up @@ -39,9 +39,9 @@ MEMORY 8G
MEMORY 12G
MEMORY 16G

# split to 17 states
# split to 26 states
# each of them will run different version of the singlestore and spark
SPLIT 17
SPLIT 26

# copy the entire git repository
COPY . .
Expand Down
9 changes: 5 additions & 4 deletions build.sbt
Expand Up @@ -4,7 +4,7 @@ import xerial.sbt.Sonatype._
To run tests or publish with a specific spark version use this java option:
-Dspark.version=3.0.0
*/
val sparkVersion = sys.props.get("spark.version").getOrElse("3.3.0")
val sparkVersion = sys.props.get("spark.version").getOrElse("3.4.0")
val scalaVersionStr = "2.12.12"
val scalaVersionPrefix = scalaVersionStr.substring(0, 4)

Expand All @@ -16,10 +16,11 @@ lazy val root = project
organization := "com.singlestore",
scalaVersion := scalaVersionStr,
Compile / unmanagedSourceDirectories += (Compile / sourceDirectory).value / (sparkVersion match {
case "3.0.0" => "scala-sparkv3.0"
case "3.0.3" => "scala-sparkv3.0"
case "3.1.3" => "scala-sparkv3.1"
case "3.2.1" => "scala-sparkv3.2"
case "3.3.0" => "scala-sparkv3.3"
case "3.2.4" => "scala-sparkv3.2"
case "3.3.2" => "scala-sparkv3.3"
case "3.4.0" => "scala-sparkv3.4"
}),
version := s"4.1.3-spark-${sparkVersion}",
licenses += "Apache-2.0" -> url(
Expand Down
37 changes: 22 additions & 15 deletions scripts/define-layerci-matrix.sh
Expand Up @@ -3,41 +3,48 @@ set -eu

TEST_NUM=${SPLIT:-"0"}

if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '1' ] || [ "$TEST_NUM" == '2' ] || [ "$TEST_NUM" == '3' ]
if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '1' ] || [ "$TEST_NUM" == '2' ] || [ "$TEST_NUM" == '3' ] || [ "$TEST_NUM" == '4' ]
then
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:centos-7.5.12-3112a491c2-4.0.0-1.12.5"'
elif [ "$TEST_NUM" == '4' ] || [ "$TEST_NUM" == '5' ] || [ "$TEST_NUM" == '6' ] || [ "$TEST_NUM" == '7' ]
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:centos-7.5.12-3112a491c2-4.0.0-1.12.5"'
elif [ "$TEST_NUM" == '5' ] || [ "$TEST_NUM" == '6' ] || [ "$TEST_NUM" == '7' ] || [ "$TEST_NUM" == '8' ] || [ "$TEST_NUM" == '9' ]
then
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:alma-7.6.14-6f67cb4355-4.0.4-1.13.6"'
elif [ "$TEST_NUM" == '8'] || ["$TEST_NUM" == '9' ] || ["$TEST_NUM" == '10' ] || ["$TEST_NUM" == '11' ]
then
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:alma-7.8.19-4263b2d130-4.0.10-1.14.4"'
elif [ "$TEST_NUM" == '10' ] || [ "$TEST_NUM" == '11' ] || [ "$TEST_NUM" == '12' ] || [ "$TEST_NUM" == '13' ] || [ "$TEST_NUM" == '14' ]
then
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:alma-7.8.19-4263b2d130-4.0.10-1.14.4"'
elif [ "$TEST_NUM" == '15' ] || [ "$TEST_NUM" == '16' ] || [ "$TEST_NUM" == '17' ] || [ "$TEST_NUM" == '18' ] || [ "$TEST_NUM" == '19' ]
then
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:alma-8.0.19-f48780d261-4.0.11-1.16.0"'
else
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:alma-8.0.15-0b9b66384f-4.0.11-1.15.2"'
echo 'export SINGLESTORE_IMAGE="singlestore/cluster-in-a-box:alma-8.1.6-74913b66cc-4.0.11-1.16.0"'
fi

if [ "$TEST_NUM" == '16' ]
if [ "$TEST_NUM" == '25' ]
then
echo 'export FORCE_READ_FROM_LEAVES=TRUE'
else
echo 'export FORCE_READ_FROM_LEAVES=FALSE'
fi

if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '4' ] || [ "$TEST_NUM" == '8' ] || [ "$TEST_NUM" == '12' ]
if [ "$TEST_NUM" == '0' ] || [ "$TEST_NUM" == '5' ] || [ "$TEST_NUM" == '10' ] || [ "$TEST_NUM" == '15' ] || [ "$TEST_NUM" == '20' ]
then
echo 'export SPARK_VERSION="3.0.0"'
echo 'export SPARK_VERSION="3.0.3"'
echo 'export TEST_FILTER="testOnly -- -l ExcludeFromSpark30"'
elif [ "$TEST_NUM" == '1' ] || [ "$TEST_NUM" == '5' ] || [ "$TEST_NUM" == '9' ] || [ "$TEST_NUM" == '13' ]
elif [ "$TEST_NUM" == '1' ] || [ "$TEST_NUM" == '6' ] || [ "$TEST_NUM" == '11' ] || [ "$TEST_NUM" == '16' ] || [ "$TEST_NUM" == '21' ]
then
echo 'export SPARK_VERSION="3.1.3"'
echo 'export TEST_FILTER="testOnly -- -l ExcludeFromSpark31"'
elif [ "$TEST_NUM" == '2' ] || [ "$TEST_NUM" == '6' ] || [ "$TEST_NUM" == '10' ] || [ "$TEST_NUM" == '14' ]
elif [ "$TEST_NUM" == '2' ] || [ "$TEST_NUM" == '7' ] || [ "$TEST_NUM" == '12' ] || [ "$TEST_NUM" == '17' ] || [ "$TEST_NUM" == '22' ]
then
echo 'export SPARK_VERSION="3.2.1"'
echo 'export SPARK_VERSION="3.2.4"'
echo 'export TEST_FILTER="testOnly -- -l ExcludeFromSpark32"'
else
echo 'export SPARK_VERSION="3.3.0"'
elif [ "$TEST_NUM" == '3' ] || [ "$TEST_NUM" == '8' ] || [ "$TEST_NUM" == '13' ] || [ "$TEST_NUM" == '18' ] || [ "$TEST_NUM" == '23' ]
then
echo 'export SPARK_VERSION="3.3.2"'
echo 'export TEST_FILTER="testOnly -- -l ExcludeFromSpark33"'
else
echo 'export SPARK_VERSION="3.4.0"'
echo 'export TEST_FILTER="testOnly -- -l ExcludeFromSpark34"'
fi


Expand Down
Expand Up @@ -87,11 +87,11 @@ case class VersionSpecificAggregateExpressionExtractor(expressionExtractor: Expr
)

// First.scala
case First(expressionExtractor(child), Literal(false, BooleanType)) =>
case First(expressionExtractor(child), false) =>
Some(aggregateWithFilter("ANY_VALUE", child, filter))

// Last.scala
case Last(expressionExtractor(child), Literal(false, BooleanType)) =>
case Last(expressionExtractor(child), false) =>
Some(aggregateWithFilter("ANY_VALUE", child, filter))

// Sum.scala
Expand Down
Expand Up @@ -193,6 +193,22 @@ case class VersionSpecificExpressionGen(expressionExtractor: ExpressionExtractor
case Base64(expressionExtractor(child)) => Some(f("TO_BASE64", child))
case UnBase64(expressionExtractor(child)) => Some(f("FROM_BASE64", child))

case Round(expressionExtractor(child), expressionExtractor(scale)) => Some(f("ROUND", child, scale))
case Unhex(expressionExtractor(child)) => Some(f("UNHEX", child))

// ----------------------------------
// Ternary Expressions
// ----------------------------------

// mathExpressions.scala
case Conv(expressionExtractor(numExpr),
intFoldableExtractor(fromBase),
intFoldableExtractor(toBase))
// SingleStore supports bases only from [2, 36]
if fromBase >= 2 && fromBase <= 36 &&
toBase >= 2 && toBase <= 36 =>
Some(f("CONV", numExpr, IntVar(fromBase), IntVar(toBase)))

case _ => None
}
}
Expand Up @@ -224,6 +224,22 @@ case class VersionSpecificExpressionGen(expressionExtractor: ExpressionExtractor
case Base64(expressionExtractor(child)) => Some(f("TO_BASE64", child))
case UnBase64(expressionExtractor(child)) => Some(f("FROM_BASE64", child))

case Round(expressionExtractor(child), expressionExtractor(scale)) => Some(f("ROUND", child, scale))
case Unhex(expressionExtractor(child)) => Some(f("UNHEX", child))

// ----------------------------------
// Ternary Expressions
// ----------------------------------

// mathExpressions.scala
case Conv(expressionExtractor(numExpr),
intFoldableExtractor(fromBase),
intFoldableExtractor(toBase))
// SingleStore supports bases only from [2, 36]
if fromBase >= 2 && fromBase <= 36 &&
toBase >= 2 && toBase <= 36 =>
Some(f("CONV", numExpr, IntVar(fromBase), IntVar(toBase)))

case _ => None
}
}
Expand Up @@ -286,6 +286,22 @@ case class VersionSpecificExpressionGen(expressionExtractor: ExpressionExtractor
case Base64(expressionExtractor(child)) => Some(f("TO_BASE64", child))
case UnBase64(expressionExtractor(child)) => Some(f("FROM_BASE64", child))

case Round(expressionExtractor(child), expressionExtractor(scale)) => Some(f("ROUND", child, scale))
case Unhex(expressionExtractor(child)) => Some(f("UNHEX", child))

// ----------------------------------
// Ternary Expressions
// ----------------------------------

// mathExpressions.scala
case Conv(expressionExtractor(numExpr),
intFoldableExtractor(fromBase),
intFoldableExtractor(toBase))
// SingleStore supports bases only from [2, 36]
if fromBase >= 2 && fromBase <= 36 &&
toBase >= 2 && toBase <= 36 =>
Some(f("CONV", numExpr, IntVar(fromBase), IntVar(toBase)))

case _ => None
}
}
16 changes: 16 additions & 0 deletions src/main/scala-sparkv3.3/spark/VersionSpecificExpressionGen.scala
Expand Up @@ -279,6 +279,22 @@ case class VersionSpecificExpressionGen(expressionExtractor: ExpressionExtractor
case Right(expressionExtractor(str), expressionExtractor(len)) =>
Some(f("RIGHT", str, len))

case Round(expressionExtractor(child), expressionExtractor(scale)) => Some(f("ROUND", child, scale))
case Unhex(expressionExtractor(child)) => Some(f("UNHEX", child))

// ----------------------------------
// Ternary Expressions
// ----------------------------------

// mathExpressions.scala
case Conv(expressionExtractor(numExpr),
intFoldableExtractor(fromBase),
intFoldableExtractor(toBase))
// SingleStore supports bases only from [2, 36]
if fromBase >= 2 && fromBase <= 36 &&
toBase >= 2 && toBase <= 36 =>
Some(f("CONV", numExpr, IntVar(fromBase), IntVar(toBase)))

case _ => None
}
}
13 changes: 13 additions & 0 deletions src/main/scala-sparkv3.4/spark/MaxNumConcurrentTasks.scala
@@ -0,0 +1,13 @@
package org.apache.spark.scheduler

import org.apache.spark.rdd.RDD

object MaxNumConcurrentTasks {
def get(rdd: RDD[_]): Int = {
val (_, resourceProfiles) =
rdd.sparkContext.dagScheduler.getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile =
rdd.sparkContext.dagScheduler.mergeResourceProfilesForStage(resourceProfiles)
rdd.sparkContext.maxNumConcurrentTasks(resourceProfile)
}
}

0 comments on commit 6c65288

Please sign in to comment.