diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 42b5d41b7b526..8a0327984e195 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -701,7 +701,7 @@ object SparkSubmit { } /** Provides utility functions to be used inside SparkSubmit. */ -private[deploy] object SparkSubmitUtils { +private[spark] object SparkSubmitUtils { // Exposed for testing var printStream = SparkSubmit.printStream diff --git a/docs/building-spark.md b/docs/building-spark.md index ea79c5bc276d3..287fcd3c4034f 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -66,7 +66,6 @@ Because HDFS is not protocol-compatible across versions, if you want to read fro Hadoop versionProfile required - 0.23.xhadoop-0.23 1.x to 2.1.x(none) 2.2.xhadoop-2.2 2.3.xhadoop-2.3 @@ -82,9 +81,6 @@ mvn -Dhadoop.version=1.2.1 -DskipTests clean package # Cloudera CDH 4.2.0 with MapReduce v1 mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package - -# Apache Hadoop 0.23.x -mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package {% endhighlight %} You can enable the "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". Spark only supports YARN versions 2.2.0 and later. diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md index 87dcc58feb494..96bd69ca3b33b 100644 --- a/docs/hadoop-third-party-distributions.md +++ b/docs/hadoop-third-party-distributions.md @@ -29,9 +29,6 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors. ReleaseVersion code CDH 4.X.X (YARN mode)2.0.0-cdh4.X.X CDH 4.X.X2.0.0-mr1-cdh4.X.X - CDH 3u60.20.2-cdh3u6 - CDH 3u50.20.2-cdh3u5 - CDH 3u40.20.2-cdh3u4 diff --git a/make-distribution.sh b/make-distribution.sh index 92177e19fe6be..1bfa9acb1fe6e 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -58,7 +58,7 @@ while (( "$#" )); do --hadoop) echo "Error: '--hadoop' is no longer supported:" echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead." - echo "Error: Related profiles include hadoop-0.23, hdaoop-2.2, hadoop-2.3 and hadoop-2.4." + echo "Error: Related profiles include hadoop-2.2, hadoop-2.3 and hadoop-2.4." exit_with_usage ;; --with-yarn) diff --git a/pom.xml b/pom.xml index 4313f940036c8..de18741feae3a 100644 --- a/pom.xml +++ b/pom.xml @@ -1614,20 +1614,6 @@ http://hadoop.apache.org/docs/ra.b.c/hadoop-project-dist/hadoop-common/dependency-analysis.html --> - - hadoop-0.23 - - - - org.apache.avro - avro - - - - 0.23.10 - - - hadoop-2.2 diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py new file mode 100644 index 0000000000000..1773ab5bdcdb1 --- /dev/null +++ b/python/pyspark/ml/tuning.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import itertools + +__all__ = ['ParamGridBuilder'] + + +class ParamGridBuilder(object): + """ + Builder for a param grid used in grid search-based model selection. + + >>> from classification import LogisticRegression + >>> lr = LogisticRegression() + >>> output = ParamGridBuilder().baseOn({lr.labelCol: 'l'}) \ + .baseOn([lr.predictionCol, 'p']) \ + .addGrid(lr.regParam, [1.0, 2.0, 3.0]) \ + .addGrid(lr.maxIter, [1, 5]) \ + .addGrid(lr.featuresCol, ['f']) \ + .build() + >>> expected = [ \ +{lr.regParam: 1.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \ +{lr.regParam: 2.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \ +{lr.regParam: 3.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \ +{lr.regParam: 1.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, \ +{lr.regParam: 2.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, \ +{lr.regParam: 3.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}] + >>> len(output) == len(expected) + True + >>> all([m in expected for m in output]) + True + """ + + def __init__(self): + self._param_grid = {} + + def addGrid(self, param, values): + """ + Sets the given parameters in this grid to fixed values. + """ + self._param_grid[param] = values + + return self + + def baseOn(self, *args): + """ + Sets the given parameters in this grid to fixed values. + Accepts either a parameter dictionary or a list of (parameter, value) pairs. + """ + if isinstance(args[0], dict): + self.baseOn(*args[0].items()) + else: + for (param, value) in args: + self.addGrid(param, [value]) + + return self + + def build(self): + """ + Builds and returns all combinations of parameters specified + by the param grid. + """ + keys = self._param_grid.keys() + grid_values = self._param_grid.values() + return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)] + + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 4d3587e69e43e..6fa322cfd3a58 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -875,6 +875,27 @@ def fillna(self, value, subset=None): return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx) + def corr(self, col1, col2, method=None): + """ + Calculates the correlation of two columns of a DataFrame as a double value. Currently only + supports the Pearson Correlation Coefficient. + :func:`DataFrame.corr` and :func:`DataFrameStatFunctions.corr` are aliases. + + :param col1: The name of the first column + :param col2: The name of the second column + :param method: The correlation method. Currently only supports "pearson" + """ + if not isinstance(col1, str): + raise ValueError("col1 should be a string.") + if not isinstance(col2, str): + raise ValueError("col2 should be a string.") + if not method: + method = "pearson" + if not method == "pearson": + raise ValueError("Currently only the calculation of the Pearson Correlation " + + "coefficient is supported.") + return self._jdf.stat().corr(col1, col2, method) + def cov(self, col1, col2): """ Calculate the sample covariance for the given columns, specified by their names, as a @@ -892,13 +913,15 @@ def cov(self, col1, col2): def crosstab(self, col1, col2): """ Computes a pair-wise frequency table of the given columns. Also known as a contingency - table. The number of distinct values for each column should be less than 1e5. The first + table. The number of distinct values for each column should be less than 1e4. The first column of each row will be the distinct values of `col1` and the column names will be the - distinct values of `col2` sorted in lexicographical order. + distinct values of `col2`. Pairs that have no occurrences will have `null` as their values. :func:`DataFrame.crosstab` and :func:`DataFrameStatFunctions.crosstab` are aliases. - :param col1: The name of the first column - :param col2: The name of the second column + :param col1: The name of the first column. Distinct items will make the first item of + each row. + :param col2: The name of the second column. Distinct items will make the column names + of the DataFrame. """ if not isinstance(col1, str): raise ValueError("col1 should be a string.") @@ -1376,6 +1399,11 @@ class DataFrameStatFunctions(object): def __init__(self, df): self.df = df + def corr(self, col1, col2, method=None): + return self.df.corr(col1, col2, method) + + corr.__doc__ = DataFrame.corr.__doc__ + def cov(self, col1, col2): return self.df.cov(col1, col2) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 714a87b79d5ea..d6cbd0a046d6b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -394,6 +394,12 @@ def test_aggregator(self): self.assertTrue(95 < g.agg(functions.approxCountDistinct(df.key)).first()[0]) self.assertEqual(100, g.agg(functions.countDistinct(df.value)).first()[0]) + def test_corr(self): + import math + df = self.sc.parallelize([Row(a=i, b=math.sqrt(i)) for i in range(10)]).toDF() + corr = df.stat.corr("a", "b") + self.assertTrue(abs(corr - 0.95734012) < 1e-6) + def test_cov(self): df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF() cov = df.stat.cov("a", "b") @@ -401,8 +407,9 @@ def test_cov(self): def test_crosstab(self): df = self.sc.parallelize([Row(a=i % 3, b=i % 2) for i in range(1, 7)]).toDF() - ct = df.stat.crosstab("a", "b") - for i, row in enumerate(ct.collect()): + ct = df.stat.crosstab("a", "b").collect() + ct = sorted(ct, lambda r: r[0]) + for i, row in enumerate(ct): self.assertEqual(row[0], str(i)) self.assertTrue(row[1], 1) self.assertTrue(row[2], 1) diff --git a/python/run-tests b/python/run-tests index 88b63b84fdc27..0e0eee3564e7c 100755 --- a/python/run-tests +++ b/python/run-tests @@ -98,6 +98,7 @@ function run_ml_tests() { echo "Run ml tests ..." run_test "pyspark/ml/feature.py" run_test "pyspark/ml/classification.py" + run_test "pyspark/ml/tuning.py" run_test "pyspark/ml/tests.py" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index b2f8157a1a61f..18c24b651921a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} */ class NoSuchTableException extends Exception +class NoSuchDatabaseException extends Exception + /** * An interface for looking up relations by name. Used by an [[Analyzer]]. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index c86214a2aa944..9d613a940ee86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -17,12 +17,31 @@ package org.apache.spark.sql.catalyst -import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File} +import java.io._ import org.apache.spark.util.Utils package object util { + /** Silences output to stderr or stdout for the duration of f */ + def quietly[A](f: => A): A = { + val origErr = System.err + val origOut = System.out + try { + System.setErr(new PrintStream(new OutputStream { + def write(b: Int) = {} + })) + System.setOut(new PrintStream(new OutputStream { + def write(b: Int) = {} + })) + + f + } finally { + System.setErr(origErr) + System.setOut(origOut) + } + } + def fileToString(file: File, encoding: String = "UTF-8"): String = { val inStream = new FileInputStream(file) val outStream = new ByteArrayOutputStream @@ -42,10 +61,9 @@ package object util { new String(outStream.toByteArray, encoding) } - def resourceToString( - resource:String, - encoding: String = "UTF-8", - classLoader: ClassLoader = Utils.getSparkClassLoader): String = { + def resourceToBytes( + resource: String, + classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = { val inStream = classLoader.getResourceAsStream(resource) val outStream = new ByteArrayOutputStream try { @@ -61,7 +79,14 @@ package object util { finally { inStream.close() } - new String(outStream.toByteArray, encoding) + outStream.toByteArray + } + + def resourceToString( + resource:String, + encoding: String = "UTF-8", + classLoader: ClassLoader = Utils.getSparkClassLoader): String = { + new String(resourceToBytes(resource, classLoader), encoding) } def stringToFile(file: File, str: String): File = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 910f28eefb796..67327ad5da8c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -37,14 +37,43 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { StatFunctions.calculateCov(df, Seq(col1, col2)) } + /* + * Calculates the correlation of two columns of a DataFrame. Currently only supports the Pearson + * Correlation Coefficient. For Spearman Correlation, consider using RDD methods found in + * MLlib's Statistics. + * + * @param col1 the name of the column + * @param col2 the name of the column to calculate the correlation against + * @return The Pearson Correlation Coefficient as a Double. + */ + def corr(col1: String, col2: String, method: String): Double = { + require(method == "pearson", "Currently only the calculation of the Pearson Correlation " + + "coefficient is supported.") + StatFunctions.pearsonCorrelation(df, Seq(col1, col2)) + } + + /** + * Calculates the Pearson Correlation Coefficient of two columns of a DataFrame. + * + * @param col1 the name of the column + * @param col2 the name of the column to calculate the correlation against + * @return The Pearson Correlation Coefficient as a Double. + */ + def corr(col1: String, col2: String): Double = { + corr(col1, col2, "pearson") + } + /** * Computes a pair-wise frequency table of the given columns. Also known as a contingency table. - * The number of distinct values for each column should be less than 1e5. The first + * The number of distinct values for each column should be less than 1e4. The first * column of each row will be the distinct values of `col1` and the column names will be the - * distinct values of `col2` sorted in lexicographical order. Counts will be returned as `Long`s. + * distinct values of `col2`. Counts will be returned as `Long`s. Pairs that have no occurrences + * will have `null` as their values. * - * @param col1 The name of the first column. - * @param col2 The name of the second column. + * @param col1 The name of the first column. Distinct items will make the first item of + * each row. + * @param col2 The name of the second column. Distinct items will make the column names + * of the DataFrame. * @return A Local DataFrame containing the table */ def crosstab(col1: String, col2: String): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index ac96f3deae229..8345f17dcd941 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -18,36 +18,50 @@ package org.apache.spark.sql.execution.stat import org.apache.spark.sql.{Column, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Cast} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ private[sql] object StatFunctions { + /** Calculate the Pearson Correlation Coefficient for the given columns */ + private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { + val counts = collectStatisticalData(df, cols) + counts.Ck / math.sqrt(counts.MkX * counts.MkY) + } + /** Helper class to simplify tracking and merging counts. */ private class CovarianceCounter extends Serializable { - var xAvg = 0.0 - var yAvg = 0.0 - var Ck = 0.0 - var count = 0L + var xAvg = 0.0 // the mean of all examples seen so far in col1 + var yAvg = 0.0 // the mean of all examples seen so far in col2 + var Ck = 0.0 // the co-moment after k examples + var MkX = 0.0 // sum of squares of differences from the (current) mean for col1 + var MkY = 0.0 // sum of squares of differences from the (current) mean for col1 + var count = 0L // count of observed examples // add an example to the calculation def add(x: Double, y: Double): this.type = { - val oldX = xAvg + val deltaX = x - xAvg + val deltaY = y - yAvg count += 1 - xAvg += (x - xAvg) / count - yAvg += (y - yAvg) / count - Ck += (y - yAvg) * (x - oldX) + xAvg += deltaX / count + yAvg += deltaY / count + Ck += deltaX * (y - yAvg) + MkX += deltaX * (x - xAvg) + MkY += deltaY * (y - yAvg) this } // merge counters from other partitions. Formula can be found at: - // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance + // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance def merge(other: CovarianceCounter): this.type = { val totalCount = count + other.count - Ck += other.Ck + - (xAvg - other.xAvg) * (yAvg - other.yAvg) * count / totalCount * other.count + val deltaX = xAvg - other.xAvg + val deltaY = yAvg - other.yAvg + Ck += other.Ck + deltaX * deltaY * count / totalCount * other.count xAvg = (xAvg * count + other.xAvg * other.count) / totalCount yAvg = (yAvg * count + other.yAvg * other.count) / totalCount + MkX += other.MkX + deltaX * deltaX * count / totalCount * other.count + MkY += other.MkY + deltaY * deltaY * count / totalCount * other.count count = totalCount this } @@ -55,13 +69,7 @@ private[sql] object StatFunctions { def cov: Double = Ck / (count - 1) } - /** - * Calculate the covariance of two numerical columns of a DataFrame. - * @param df The DataFrame - * @param cols the column names - * @return the covariance of the two columns. - */ - private[sql] def calculateCov(df: DataFrame, cols: Seq[String]): Double = { + private def collectStatisticalData(df: DataFrame, cols: Seq[String]): CovarianceCounter = { require(cols.length == 2, "Currently cov supports calculating the covariance " + "between two columns.") cols.map(name => (name, df.schema.fields.find(_.name == name))).foreach { case (name, data) => @@ -70,34 +78,45 @@ private[sql] object StatFunctions { s"with dataType ${data.get.dataType} not supported.") } val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType))) - val counts = df.select(columns:_*).rdd.aggregate(new CovarianceCounter)( + df.select(columns: _*).rdd.aggregate(new CovarianceCounter)( seqOp = (counter, row) => { counter.add(row.getDouble(0), row.getDouble(1)) }, combOp = (baseCounter, other) => { baseCounter.merge(other) - }) + }) + } + + /** + * Calculate the covariance of two numerical columns of a DataFrame. + * @param df The DataFrame + * @param cols the column names + * @return the covariance of the two columns. + */ + private[sql] def calculateCov(df: DataFrame, cols: Seq[String]): Double = { + val counts = collectStatisticalData(df, cols) counts.cov } /** Generate a table of frequencies for the elements of two columns. */ private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { val tableName = s"${col1}_$col2" - val counts = df.groupBy(col1, col2).agg(col(col1), col(col2), count("*")).collect() - // We need to sort the entries to pivot them properly - val sorted = counts.sortBy(r => (r.get(0).toString, r.get(1).toString)) - val first = sorted.head.get(0) + val counts = df.groupBy(col1, col2).agg(col(col1), col(col2), count("*")).take(1e8.toInt) // get the distinct values of column 2, so that we can make them the column names - val distinctCol2 = sorted.takeWhile(r => r.get(0) == first) - val columnSize = distinctCol2.length + val distinctCol2 = counts.map(_.get(1)).distinct.zipWithIndex.toMap + val columnSize = distinctCol2.size require(columnSize < 1e4, s"The number of distinct values for $col2, can't " + s"exceed 1e4. Currently $columnSize") - val table = sorted.grouped(columnSize).map { rows => + val table = counts.groupBy(_.get(0)).map { case (col1Items, rows) => + val countsRow = new GenericMutableRow(columnSize + 1) + rows.foreach { row => + countsRow.setLong(distinctCol2.get(row.get(1)).get + 1, row.getLong(2)) + } // the value of col1 is the first value, the rest are the counts - val rowValues = rows.head.get(0).toString +: rows.map(_.getLong(2)) - Row(rowValues: _*) + countsRow.setString(0, col1Items.toString) + countsRow }.toSeq - val headerNames = distinctCol2.map(r => StructField(r.get(1).toString, LongType)) + val headerNames = distinctCol2.map(r => StructField(r._1.toString, LongType)).toSeq val schema = StructType(StructField(tableName, StringType) +: headerNames) new DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index eb75461c6a8a2..58cc8e5be6075 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -34,6 +34,7 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; @@ -179,6 +180,14 @@ public void testCreateDataFrameFromJavaBeans() { } } + private static Comparator CrosstabRowComparator = new Comparator() { + public int compare(Row row1, Row row2) { + String item1 = row1.getString(0); + String item2 = row2.getString(0); + return item1.compareTo(item2); + } + }; + @Test public void testCrosstab() { DataFrame df = context.table("testData2"); @@ -188,6 +197,7 @@ public void testCrosstab() { Assert.assertEquals(columnNames[1], "1"); Assert.assertEquals(columnNames[2], "2"); Row[] rows = crosstab.collect(); + Arrays.sort(rows, CrosstabRowComparator); Integer count = 1; for (Row row : rows) { Assert.assertEquals(row.get(0).toString(), count.toString()); @@ -205,6 +215,13 @@ public void testFrequentItems() { Assert.assertTrue(results.collect()[0].getSeq(0).contains(1)); } + @Test + public void testCorrelation() { + DataFrame df = context.table("testData2"); + Double pearsonCorr = df.stat().corr("a", "b", "pearson"); + Assert.assertTrue(Math.abs(pearsonCorr) < 1e-6); + } + @Test public void testCovariance() { DataFrame df = context.table("testData2"); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 570127aec5111..46b1845a9180c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -27,31 +27,67 @@ class DataFrameStatSuite extends FunSuite { val sqlCtx = TestSQLContext def toLetter(i: Int): String = (i + 97).toChar.toString - + + test("pearson correlation") { + val df = Seq.tabulate(10)(i => (i, 2 * i, i * -1.0)).toDF("a", "b", "c") + val corr1 = df.stat.corr("a", "b", "pearson") + assert(math.abs(corr1 - 1.0) < 1e-12) + val corr2 = df.stat.corr("a", "c", "pearson") + assert(math.abs(corr2 + 1.0) < 1e-12) + // non-trivial example. To reproduce in python, use: + // >>> from scipy.stats import pearsonr + // >>> import numpy as np + // >>> a = np.array(range(20)) + // >>> b = np.array([x * x - 2 * x + 3.5 for x in range(20)]) + // >>> pearsonr(a, b) + // (0.95723391394758572, 3.8902121417802199e-11) + // In R, use: + // > a <- 0:19 + // > b <- mapply(function(x) x * x - 2 * x + 3.5, a) + // > cor(a, b) + // [1] 0.957233913947585835 + val df2 = Seq.tabulate(20)(x => (x, x * x - 2 * x + 3.5)).toDF("a", "b") + val corr3 = df2.stat.corr("a", "b", "pearson") + assert(math.abs(corr3 - 0.95723391394758572) < 1e-12) + } + + test("covariance") { + val df = Seq.tabulate(10)(i => (i, 2.0 * i, toLetter(i))).toDF("singles", "doubles", "letters") + + val results = df.stat.cov("singles", "doubles") + assert(math.abs(results - 55.0 / 3) < 1e-12) + intercept[IllegalArgumentException] { + df.stat.cov("singles", "letters") // doesn't accept non-numerical dataTypes + } + val decimalData = Seq.tabulate(6)(i => (BigDecimal(i % 3), BigDecimal(i % 2))).toDF("a", "b") + val decimalRes = decimalData.stat.cov("a", "b") + assert(math.abs(decimalRes) < 1e-12) + } + test("crosstab") { - val df = Seq.tabulate(8)(i => (i % 3, i % 2)).toDF("a", "b") + val df = Seq((0, 0), (2, 1), (1, 0), (2, 0), (0, 0), (2, 0)).toDF("a", "b") val crosstab = df.stat.crosstab("a", "b") val columnNames = crosstab.schema.fieldNames assert(columnNames(0) === "a_b") assert(columnNames(1) === "0") assert(columnNames(2) === "1") - val rows: Array[Row] = crosstab.collect() + val rows: Array[Row] = crosstab.collect().sortBy(_.getString(0)) assert(rows(0).get(0).toString === "0") assert(rows(0).getLong(1) === 2L) - assert(rows(0).getLong(2) === 1L) + assert(rows(0).get(2) === null) assert(rows(1).get(0).toString === "1") assert(rows(1).getLong(1) === 1L) - assert(rows(1).getLong(2) === 2L) + assert(rows(1).get(2) === null) assert(rows(2).get(0).toString === "2") - assert(rows(2).getLong(1) === 1L) + assert(rows(2).getLong(1) === 2L) assert(rows(2).getLong(2) === 1L) } test("Frequent Items") { - val rows = Array.tabulate(1000) { i => + val rows = Seq.tabulate(1000) { i => if (i % 3 == 0) (1, toLetter(1), -1.0) else (i, toLetter(i), i * -1.0) } - val df = sqlCtx.sparkContext.parallelize(rows).toDF("numbers", "letters", "negDoubles") + val df = rows.toDF("numbers", "letters", "negDoubles") val results = df.stat.freqItems(Array("numbers", "letters"), 0.1) val items = results.collect().head @@ -61,21 +97,5 @@ class DataFrameStatSuite extends FunSuite { val singleColResults = df.stat.freqItems(Array("negDoubles"), 0.1) val items2 = singleColResults.collect().head items2.getSeq[Double](0) should contain (-1.0) - - } - - test("covariance") { - val rows = Array.tabulate(10)(i => (i, 2.0 * i, toLetter(i))) - val df = sqlCtx.sparkContext.parallelize(rows).toDF("singles", "doubles", "letters") - - val results = df.stat.cov("singles", "doubles") - assert(math.abs(results - 55.0 / 3) < 1e-6) - intercept[IllegalArgumentException] { - df.stat.cov("singles", "letters") // doesn't accept non-numerical dataTypes - } - val decimalData = sqlCtx.sparkContext.parallelize( - (1 to 6).map(i => (BigDecimal(i % 3), BigDecimal(i % 2)))).toDF("a", "b") - val decimalRes = decimalData.stat.cov("a", "b") - assert(math.abs(decimalRes) < 1e-6) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala new file mode 100644 index 0000000000000..a863aa77cb7e0 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} + +case class HiveDatabase( + name: String, + location: String) + +abstract class TableType { val name: String } +case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" } +case object IndexTable extends TableType { override val name = "INDEX_TABLE" } +case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" } +case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" } + +case class HiveStorageDescriptor( + location: String, + inputFormat: String, + outputFormat: String, + serde: String) + +case class HivePartition( + values: Seq[String], + storage: HiveStorageDescriptor) + +case class HiveColumn(name: String, hiveType: String, comment: String) +case class HiveTable( + specifiedDatabase: Option[String], + name: String, + schema: Seq[HiveColumn], + partitionColumns: Seq[HiveColumn], + properties: Map[String, String], + serdeProperties: Map[String, String], + tableType: TableType, + location: Option[String] = None, + inputFormat: Option[String] = None, + outputFormat: Option[String] = None, + serde: Option[String] = None) { + + @transient + private[client] var client: ClientInterface = _ + + private[client] def withClient(ci: ClientInterface): this.type = { + client = ci + this + } + + def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved")) + + def isPartitioned: Boolean = partitionColumns.nonEmpty + + def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this) + + // Hive does not support backticks when passing names to the client. + def qualifiedName: String = s"$database.$name" +} + +/** + * An externally visible interface to the Hive client. This interface is shared across both the + * internal and external classloaders for a given version of Hive and thus must expose only + * shared classes. + */ +trait ClientInterface { + /** + * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will + * result in one string. + */ + def runSqlHive(sql: String): Seq[String] + + /** Returns the names of all tables in the given database. */ + def listTables(dbName: String): Seq[String] + + /** Returns the name of the active database. */ + def currentDatabase: String + + /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ + def getDatabase(name: String): HiveDatabase = { + getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException) + } + + /** Returns the metadata for a given database, or None if it doesn't exist. */ + def getDatabaseOption(name: String): Option[HiveDatabase] + + /** Returns the specified table, or throws [[NoSuchTableException]]. */ + def getTable(dbName: String, tableName: String): HiveTable = { + getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException) + } + + /** Returns the metadata for the specified table or None if it doens't exist. */ + def getTableOption(dbName: String, tableName: String): Option[HiveTable] + + /** Creates a table with the given metadata. */ + def createTable(table: HiveTable): Unit + + /** Updates the given table with new metadata. */ + def alterTable(table: HiveTable): Unit + + /** Creates a new database with the given name. */ + def createDatabase(database: HiveDatabase): Unit + + /** Returns all partitions for the given table. */ + def getAllPartitions(hTable: HiveTable): Seq[HivePartition] + + /** Loads a static partition into an existing table. */ + def loadPartition( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering + replace: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit + + /** Loads data into an existing table. */ + def loadTable( + loadPath: String, // TODO URI + tableName: String, + replace: Boolean, + holdDDLTime: Boolean): Unit + + /** Loads new dynamic partitions into an existing table. */ + def loadDynamicPartitions( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit + + /** Used for testing only. Removes all metadata from this instance of Hive. */ + def reset(): Unit +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala new file mode 100644 index 0000000000000..ea52fea037f1f --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import java.io.{BufferedReader, InputStreamReader, File, PrintStream} +import java.net.URI +import java.util.{ArrayList => JArrayList} + +import scala.collection.JavaConversions._ +import scala.language.reflectiveCalls + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.metastore.api.Database +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.api +import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.metadata +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.ql.processors._ +import org.apache.hadoop.hive.ql.Driver + +import org.apache.spark.Logging +import org.apache.spark.sql.execution.QueryExecutionException + + +/** + * A class that wraps the HiveClient and converts its responses to externally visible classes. + * Note that this class is typically loaded with an internal classloader for each instantiation, + * allowing it to interact directly with a specific isolated version of Hive. Loading this class + * with the isolated classloader however will result in it only being visible as a ClientInterface, + * not a ClientWrapper. + * + * This class needs to interact with multiple versions of Hive, but will always be compiled with + * the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility + * must use reflection after matching on `version`. + * + * @param version the version of hive used when pick function calls that are not compatible. + * @param config a collection of configuration options that will be added to the hive conf before + * opening the hive client. + */ +class ClientWrapper( + version: HiveVersion, + config: Map[String, String]) + extends ClientInterface + with Logging + with ReflectionMagic { + + private val conf = new HiveConf(classOf[SessionState]) + config.foreach { case (k, v) => + logDebug(s"Hive Config: $k=$v") + conf.set(k, v) + } + + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. + private val outputBuffer = new java.io.OutputStream { + var pos: Int = 0 + var buffer = new Array[Int](10240) + def write(i: Int): Unit = { + buffer(pos) = i + pos = (pos + 1) % buffer.size + } + + override def toString: String = { + val (end, start) = buffer.splitAt(pos) + val input = new java.io.InputStream { + val iterator = (start ++ end).iterator + + def read(): Int = if (iterator.hasNext) iterator.next() else -1 + } + val reader = new BufferedReader(new InputStreamReader(input)) + val stringBuilder = new StringBuilder + var line = reader.readLine() + while(line != null) { + stringBuilder.append(line) + stringBuilder.append("\n") + line = reader.readLine() + } + stringBuilder.toString() + } + } + + val state = { + val original = Thread.currentThread().getContextClassLoader + Thread.currentThread().setContextClassLoader(getClass.getClassLoader) + val ret = try { + val newState = new SessionState(conf) + SessionState.start(newState) + newState.out = new PrintStream(outputBuffer, true, "UTF-8") + newState.err = new PrintStream(outputBuffer, true, "UTF-8") + newState + } finally { + Thread.currentThread().setContextClassLoader(original) + } + ret + } + + private val client = Hive.get(conf) + + /** + * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive. + */ + private def withHiveState[A](f: => A): A = synchronized { + val original = Thread.currentThread().getContextClassLoader + Thread.currentThread().setContextClassLoader(getClass.getClassLoader) + Hive.set(client) + version match { + case hive.v12 => + classOf[SessionState] + .callStatic[SessionState, SessionState]("start", state) + case hive.v13 => + classOf[SessionState] + .callStatic[SessionState, SessionState]("setCurrentSessionState", state) + } + val ret = try f finally { + Thread.currentThread().setContextClassLoader(original) + } + ret + } + + override def currentDatabase: String = withHiveState { + state.getCurrentDatabase + } + + override def createDatabase(database: HiveDatabase): Unit = withHiveState { + client.createDatabase( + new Database( + database.name, + "", + new File(database.location).toURI.toString, + new java.util.HashMap), + true) + } + + override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState { + Option(client.getDatabase(name)).map { d => + HiveDatabase( + name = d.getName, + location = d.getLocationUri) + } + } + + override def getTableOption( + dbName: String, + tableName: String): Option[HiveTable] = withHiveState { + + logDebug(s"Looking up $dbName.$tableName") + + val hiveTable = Option(client.getTable(dbName, tableName, false)) + val converted = hiveTable.map { h => + + HiveTable( + name = h.getTableName, + specifiedDatabase = Option(h.getDbName), + schema = h.getCols.map(f => HiveColumn(f.getName, f.getType, f.getComment)), + partitionColumns = h.getPartCols.map(f => HiveColumn(f.getName, f.getType, f.getComment)), + properties = h.getParameters.toMap, + serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.toMap, + tableType = ManagedTable, // TODO + location = version match { + case hive.v12 => Option(h.call[URI]("getDataLocation")).map(_.toString) + case hive.v13 => Option(h.call[Path]("getDataLocation")).map(_.toString) + }, + inputFormat = Option(h.getInputFormatClass).map(_.getName), + outputFormat = Option(h.getOutputFormatClass).map(_.getName), + serde = Option(h.getSerializationLib)).withClient(this) + } + converted + } + + private def toInputFormat(name: String) = + Class.forName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] + + private def toOutputFormat(name: String) = + Class.forName(name) + .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] + + private def toQlTable(table: HiveTable): metadata.Table = { + val qlTable = new metadata.Table(table.database, table.name) + + qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + qlTable.setPartCols( + table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) } + table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) } + version match { + case hive.v12 => + table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u)) + case hive.v13 => + table.location + .map(new org.apache.hadoop.fs.Path(_)) + .foreach(qlTable.call[Path, Unit]("setDataLocation", _)) + } + table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass) + table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass) + table.serde.foreach(qlTable.setSerializationLib) + + qlTable + } + + override def createTable(table: HiveTable): Unit = withHiveState { + val qlTable = toQlTable(table) + client.createTable(qlTable) + } + + override def alterTable(table: HiveTable): Unit = withHiveState { + val qlTable = toQlTable(table) + client.alterTable(table.qualifiedName, qlTable) + } + + override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState { + val qlTable = toQlTable(hTable) + val qlPartitions = version match { + case hive.v12 => + client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsForPruner", qlTable) + case hive.v13 => + client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsOf", qlTable) + } + qlPartitions.map(_.getTPartition).map { p => + HivePartition( + values = Option(p.getValues).map(_.toSeq).getOrElse(Seq.empty), + storage = HiveStorageDescriptor( + location = p.getSd.getLocation, + inputFormat = p.getSd.getInputFormat, + outputFormat = p.getSd.getOutputFormat, + serde = p.getSd.getSerdeInfo.getSerializationLib)) + }.toSeq + } + + override def listTables(dbName: String): Seq[String] = withHiveState { + client.getAllTables + } + + /** + * Runs the specified SQL query using Hive. + */ + override def runSqlHive(sql: String): Seq[String] = { + val maxResults = 100000 + val results = runHive(sql, maxResults) + // It is very confusing when you only get back some of the results... + if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED") + results + } + + /** + * Execute the command using Hive and return the results as a sequence. Each element + * in the sequence is one row. + */ + protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { + logDebug(s"Running hiveql '$cmd'") + if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") } + try { + val cmd_trimmed: String = cmd.trim() + val tokens: Array[String] = cmd_trimmed.split("\\s+") + val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() + val proc: CommandProcessor = version match { + case hive.v12 => + classOf[CommandProcessorFactory] + .callStatic[String, HiveConf, CommandProcessor]("get", cmd_1, conf) + case hive.v13 => + classOf[CommandProcessorFactory] + .callStatic[Array[String], HiveConf, CommandProcessor]("get", Array(tokens(0)), conf) + } + + proc match { + case driver: Driver => + val response: CommandProcessorResponse = driver.run(cmd) + // Throw an exception if there is an error in query processing. + if (response.getResponseCode != 0) { + driver.close() + throw new QueryExecutionException(response.getErrorMessage) + } + driver.setMaxRows(maxRows) + + val results = version match { + case hive.v12 => + val res = new JArrayList[String] + driver.call[JArrayList[String], Boolean]("getResults", res) + res.toSeq + case hive.v13 => + val res = new JArrayList[Object] + driver.call[JArrayList[Object], Boolean]("getResults", res) + res.map { r => + r match { + case s: String => s + case a: Array[Object] => a(0).asInstanceOf[String] + } + } + } + driver.close() + results + + case _ => + if (state.out != null) { + state.out.println(tokens(0) + " " + cmd_1) + } + Seq(proc.run(cmd_1).getResponseCode.toString) + } + } catch { + case e: Exception => + logError( + s""" + |====================== + |HIVE FAILURE OUTPUT + |====================== + |${outputBuffer.toString} + |====================== + |END HIVE FAILURE OUTPUT + |====================== + """.stripMargin) + throw e + } + } + + def loadPartition( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], + replace: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = withHiveState { + + client.loadPartition( + new Path(loadPath), // TODO: Use URI + tableName, + partSpec, + replace, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } + + def loadTable( + loadPath: String, // TODO URI + tableName: String, + replace: Boolean, + holdDDLTime: Boolean): Unit = withHiveState { + client.loadTable( + new Path(loadPath), + tableName, + replace, + holdDDLTime) + } + + def loadDynamicPartitions( + loadPath: String, + tableName: String, + partSpec: java.util.LinkedHashMap[String, String], + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean, + listBucketingEnabled: Boolean): Unit = withHiveState { + client.loadDynamicPartitions( + new Path(loadPath), + tableName, + partSpec, + replace, + numDP, + holdDDLTime, + listBucketingEnabled) + } + + def reset(): Unit = withHiveState { + client.getAllTables("default").foreach { t => + logDebug(s"Deleting table $t") + val table = client.getTable("default", t) + client.getIndexes("default", t, 255).foreach { index => + client.dropIndex("default", t, index.getIndexName, true) + } + if (!table.isIndexTable) { + client.dropTable("default", t) + } + } + client.getAllDatabases.filterNot(_ == "default").foreach { db => + logDebug(s"Dropping Database: $db") + client.dropDatabase(db, true, false, true) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala new file mode 100644 index 0000000000000..710dbca6e3c66 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import java.io.File +import java.net.URLClassLoader +import java.util + +import scala.language.reflectiveCalls +import scala.util.Try + +import org.apache.commons.io.{FileUtils, IOUtils} + +import org.apache.spark.Logging +import org.apache.spark.deploy.SparkSubmitUtils + +import org.apache.spark.sql.catalyst.util.quietly + +/** Factory for `IsolatedClientLoader` with specific versions of hive. */ +object IsolatedClientLoader { + /** + * Creates isolated Hive client loaders by downloading the requested version from maven. + */ + def forVersion( + version: String, + config: Map[String, String] = Map.empty): IsolatedClientLoader = synchronized { + val resolvedVersion = hiveVersion(version) + val files = resolvedVersions.getOrElseUpdate(resolvedVersion, downloadVersion(resolvedVersion)) + new IsolatedClientLoader(hiveVersion(version), files, config) + } + + def hiveVersion(version: String): HiveVersion = version match { + case "12" | "0.12" | "0.12.0" => hive.v12 + case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13 + } + + private def downloadVersion(version: HiveVersion): Seq[File] = { + val hiveArtifacts = + (Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") ++ + (if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil)) + .map(a => s"org.apache.hive:$a:${version.fullVersion}") :+ + "com.google.guava:guava:14.0.1" :+ + "org.apache.hadoop:hadoop-client:2.4.0" :+ + "mysql:mysql-connector-java:5.1.12" + + val classpath = quietly { + SparkSubmitUtils.resolveMavenCoordinates( + hiveArtifacts.mkString(","), + Some("http://www.datanucleus.org/downloads/maven2"), + None) + } + val allFiles = classpath.split(",").map(new File(_)).toSet + + // TODO: Remove copy logic. + val tempDir = File.createTempFile("hive", "v" + version.toString) + tempDir.delete() + tempDir.mkdir() + + allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) + tempDir.listFiles() + } + + private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[File]] +} + +/** + * Creates a Hive `ClientInterface` using a classloader that works according to the following rules: + * - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader` + * allowing the results of calls to the `ClientInterface` to be visible externally. + * - Hive classes: new instances are loaded from `execJars`. These classes are not + * accessible externally due to their custom loading. + * - ClientWrapper: a new copy is created for each instance of `IsolatedClassLoader`. + * This new instance is able to see a specific version of hive without using reflection. Since + * this is a unique instance, it is not visible externally other than as a generic + * `ClientInterface`, unless `isolationOn` is set to `false`. + * + * @param version The version of hive on the classpath. used to pick specific function signatures + * that are not compatibile accross versions. + * @param execJars A collection of jar files that must include hive and hadoop. + * @param config A set of options that will be added to the HiveConf of the constructed client. + * @param isolationOn When true, custom versions of barrier classes will be constructed. Must be + * true unless loading the version of hive that is on Sparks classloader. + * @param rootClassLoader The system root classloader. Must not know about hive classes. + * @param baseClassLoader The spark classloader that is used to load shared classes. + * + */ +class IsolatedClientLoader( + val version: HiveVersion, + val execJars: Seq[File] = Seq.empty, + val config: Map[String, String] = Map.empty, + val isolationOn: Boolean = true, + val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent, + val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader) + extends Logging { + + // Check to make sure that the root classloader does not know about Hive. + assert(Try(baseClassLoader.loadClass("org.apache.hive.HiveConf")).isFailure) + + /** All jars used by the hive specific classloader. */ + protected def allJars = execJars.map(_.toURI.toURL).toArray + + protected def isSharedClass(name: String): Boolean = + name.contains("slf4j") || + name.contains("log4j") || + name.startsWith("org.apache.spark.") || + name.startsWith("scala.") || + name.startsWith("com.google") || + name.startsWith("java.lang.") || + name.startsWith("java.net") + + /** True if `name` refers to a spark class that must see specific version of Hive. */ + protected def isBarrierClass(name: String): Boolean = + name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") || + name.startsWith(classOf[ClientWrapper].getName) || + name.startsWith(classOf[ReflectionMagic].getName) + + protected def classToPath(name: String): String = + name.replaceAll("\\.", "/") + ".class" + + /** The classloader that is used to load an isolated version of Hive. */ + protected val classLoader: ClassLoader = new URLClassLoader(allJars, rootClassLoader) { + override def loadClass(name: String, resolve: Boolean): Class[_] = { + val loaded = findLoadedClass(name) + if (loaded == null) doLoadClass(name, resolve) else loaded + } + + def doLoadClass(name: String, resolve: Boolean): Class[_] = { + val classFileName = name.replaceAll("\\.", "/") + ".class" + if (isBarrierClass(name) && isolationOn) { + val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) + logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") + defineClass(name, bytes, 0, bytes.length) + } else if (!isSharedClass(name)) { + logDebug(s"hive class: $name - ${getResource(classToPath(name))}") + super.loadClass(name, resolve) + } else { + logDebug(s"shared class: $name") + baseClassLoader.loadClass(name) + } + } + } + + // Pre-reflective instantiation setup. + logDebug("Initializing the logger to avoid disaster...") + Thread.currentThread.setContextClassLoader(classLoader) + + /** The isolated client interface to Hive. */ + val client: ClientInterface = try { + classLoader + .loadClass(classOf[ClientWrapper].getName) + .getConstructors.head + .newInstance(version, config) + .asInstanceOf[ClientInterface] + } finally { + Thread.currentThread.setContextClassLoader(baseClassLoader) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala new file mode 100644 index 0000000000000..90d03049356b5 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import scala.reflect._ + +/** + * Provides implicit functions on any object for calling methods reflectively. + */ +protected trait ReflectionMagic { + /** code for InstanceMagic + println( + (1 to 22).map { n => + def repeat(str: String => String) = (1 to n).map(i => str(i.toString)).mkString(", ") + val types = repeat(n => s"A$n <: AnyRef : ClassTag") + val inArgs = repeat(n => s"a$n: A$n") + val erasure = repeat(n => s"classTag[A$n].erasure") + val outArgs = repeat(n => s"a$n") + s"""|def call[$types, R](name: String, $inArgs): R = { + | clazz.getMethod(name, $erasure).invoke(a, $outArgs).asInstanceOf[R] + |}""".stripMargin + }.mkString("\n") + ) + */ + + // scalastyle:off + protected implicit class InstanceMagic(a: Any) { + private val clazz = a.getClass + + def call[R](name: String): R = { + clazz.getMethod(name).invoke(a).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, R](name: String, a1: A1): R = { + clazz.getMethod(name, classTag[A1].erasure).invoke(a, a1).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure).invoke(a, a1, a2).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure).invoke(a, a1, a2, a3).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure).invoke(a, a1, a2, a3, a4).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure).invoke(a, a1, a2, a3, a4, a5).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure).invoke(a, a1, a2, a3, a4, a5, a6).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, A20 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19, a20: A20): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure, classTag[A20].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, A20 <: AnyRef : ClassTag, A21 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19, a20: A20, a21: A21): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure, classTag[A20].erasure, classTag[A21].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, a21).asInstanceOf[R] + } + def call[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, A20 <: AnyRef : ClassTag, A21 <: AnyRef : ClassTag, A22 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19, a20: A20, a21: A21, a22: A22): R = { + clazz.getMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure, classTag[A20].erasure, classTag[A21].erasure, classTag[A22].erasure).invoke(a, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, a21, a22).asInstanceOf[R] + } + } + + /** code for StaticMagic + println( + (1 to 22).map { n => + def repeat(str: String => String) = (1 to n).map(i => str(i.toString)).mkString(", ") + val types = repeat(n => s"A$n <: AnyRef : ClassTag") + val inArgs = repeat(n => s"a$n: A$n") + val erasure = repeat(n => s"classTag[A$n].erasure") + val outArgs = repeat(n => s"a$n") + s"""|def callStatic[$types, R](name: String, $inArgs): R = { + | c.getDeclaredMethod(name, $erasure).invoke(c, $outArgs).asInstanceOf[R] + |}""".stripMargin + }.mkString("\n") + ) + */ + + protected implicit class StaticMagic(c: Class[_]) { + def callStatic[A1 <: AnyRef : ClassTag, R](name: String, a1: A1): R = { + c.getDeclaredMethod(name, classTag[A1].erasure).invoke(c, a1).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure).invoke(c, a1, a2).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure).invoke(c, a1, a2, a3).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure).invoke(c, a1, a2, a3, a4).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure).invoke(c, a1, a2, a3, a4, a5).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure).invoke(c, a1, a2, a3, a4, a5, a6).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, A20 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19, a20: A20): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure, classTag[A20].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, A20 <: AnyRef : ClassTag, A21 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19, a20: A20, a21: A21): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure, classTag[A20].erasure, classTag[A21].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, a21).asInstanceOf[R] + } + def callStatic[A1 <: AnyRef : ClassTag, A2 <: AnyRef : ClassTag, A3 <: AnyRef : ClassTag, A4 <: AnyRef : ClassTag, A5 <: AnyRef : ClassTag, A6 <: AnyRef : ClassTag, A7 <: AnyRef : ClassTag, A8 <: AnyRef : ClassTag, A9 <: AnyRef : ClassTag, A10 <: AnyRef : ClassTag, A11 <: AnyRef : ClassTag, A12 <: AnyRef : ClassTag, A13 <: AnyRef : ClassTag, A14 <: AnyRef : ClassTag, A15 <: AnyRef : ClassTag, A16 <: AnyRef : ClassTag, A17 <: AnyRef : ClassTag, A18 <: AnyRef : ClassTag, A19 <: AnyRef : ClassTag, A20 <: AnyRef : ClassTag, A21 <: AnyRef : ClassTag, A22 <: AnyRef : ClassTag, R](name: String, a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6, a7: A7, a8: A8, a9: A9, a10: A10, a11: A11, a12: A12, a13: A13, a14: A14, a15: A15, a16: A16, a17: A17, a18: A18, a19: A19, a20: A20, a21: A21, a22: A22): R = { + c.getDeclaredMethod(name, classTag[A1].erasure, classTag[A2].erasure, classTag[A3].erasure, classTag[A4].erasure, classTag[A5].erasure, classTag[A6].erasure, classTag[A7].erasure, classTag[A8].erasure, classTag[A9].erasure, classTag[A10].erasure, classTag[A11].erasure, classTag[A12].erasure, classTag[A13].erasure, classTag[A14].erasure, classTag[A15].erasure, classTag[A16].erasure, classTag[A17].erasure, classTag[A18].erasure, classTag[A19].erasure, classTag[A20].erasure, classTag[A21].erasure, classTag[A22].erasure).invoke(c, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, a21, a22).asInstanceOf[R] + } + } + // scalastyle:on +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala new file mode 100644 index 0000000000000..7db9200d47440 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +/** Support for interacting with different versions of the HiveMetastoreClient */ +package object client { + private[client] abstract class HiveVersion(val fullVersion: String, val hasBuiltinsJar: Boolean) + + // scalastyle:off + private[client] object hive { + case object v10 extends HiveVersion("0.10.0", true) + case object v11 extends HiveVersion("0.11.0", false) + case object v12 extends HiveVersion("0.12.0", false) + case object v13 extends HiveVersion("0.13.1", false) + } + // scalastyle:on + +} \ No newline at end of file diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala new file mode 100644 index 0000000000000..81e77ba257bf1 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.client + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.util.Utils +import org.scalatest.FunSuite + +class VersionsSuite extends FunSuite with Logging { + val testType = "derby" + + private def buildConf() = { + lazy val warehousePath = Utils.createTempDir() + lazy val metastorePath = Utils.createTempDir() + metastorePath.delete() + Map( + "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true", + "hive.metastore.warehouse.dir" -> warehousePath.toString) + } + + test("success sanity check") { + val badClient = IsolatedClientLoader.forVersion("13", buildConf()).client + val db = new HiveDatabase("default", "") + badClient.createDatabase(db) + } + + private def getNestedMessages(e: Throwable): String = { + var causes = "" + var lastException = e + while (lastException != null) { + causes += lastException.toString + "\n" + lastException = lastException.getCause + } + causes + } + + // Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally + // connecting to an auto-populated, in-process metastore. Let's make sure we are getting the + // versions right by forcing a known compatibility failure. + // TODO: currently only works on mysql where we manually create the schema... + ignore("failure sanity check") { + val e = intercept[Throwable] { + val badClient = quietly { IsolatedClientLoader.forVersion("13", buildConf()).client } + } + assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'") + } + + private val versions = Seq("12", "13") + + private var client: ClientInterface = null + + versions.foreach { version => + test(s"$version: listTables") { + client = null + client = IsolatedClientLoader.forVersion(version, buildConf()).client + client.listTables("default") + } + + test(s"$version: createDatabase") { + val db = HiveDatabase("default", "") + client.createDatabase(db) + } + + test(s"$version: createTable") { + val table = + HiveTable( + specifiedDatabase = Option("default"), + name = "src", + schema = Seq(HiveColumn("key", "int", "")), + partitionColumns = Seq.empty, + properties = Map.empty, + serdeProperties = Map.empty, + tableType = ManagedTable, + location = None, + inputFormat = + Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName), + outputFormat = + Some(classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = + Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName())) + + client.createTable(table) + } + + test(s"$version: getTable") { + client.getTable("default", "src") + } + } +}