Skip to content

Commit

Permalink
addressed comments v4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed May 4, 2015
2 parents bced829 + 9646018 commit 9106585
Show file tree
Hide file tree
Showing 21 changed files with 1,359 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ object SparkSubmit {
}

/** Provides utility functions to be used inside SparkSubmit. */
private[deploy] object SparkSubmitUtils {
private[spark] object SparkSubmitUtils {

// Exposed for testing
var printStream = SparkSubmit.printStream
Expand Down
4 changes: 0 additions & 4 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ Because HDFS is not protocol-compatible across versions, if you want to read fro
<tr><th>Hadoop version</th><th>Profile required</th></tr>
</thead>
<tbody>
<tr><td>0.23.x</td><td>hadoop-0.23</td></tr>
<tr><td>1.x to 2.1.x</td><td>(none)</td></tr>
<tr><td>2.2.x</td><td>hadoop-2.2</td></tr>
<tr><td>2.3.x</td><td>hadoop-2.3</td></tr>
Expand All @@ -82,9 +81,6 @@ mvn -Dhadoop.version=1.2.1 -DskipTests clean package

# Cloudera CDH 4.2.0 with MapReduce v1
mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package

# Apache Hadoop 0.23.x
mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package
{% endhighlight %}

You can enable the "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". Spark only supports YARN versions 2.2.0 and later.
Expand Down
3 changes: 0 additions & 3 deletions docs/hadoop-third-party-distributions.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors.
<tr><th>Release</th><th>Version code</th></tr>
<tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-cdh4.X.X</td></tr>
<tr><td>CDH 4.X.X</td><td>2.0.0-mr1-cdh4.X.X</td></tr>
<tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
<tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
<tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>
</table>
</td>
<td>
Expand Down
2 changes: 1 addition & 1 deletion make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ while (( "$#" )); do
--hadoop)
echo "Error: '--hadoop' is no longer supported:"
echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead."
echo "Error: Related profiles include hadoop-0.23, hdaoop-2.2, hadoop-2.3 and hadoop-2.4."
echo "Error: Related profiles include hadoop-2.2, hadoop-2.3 and hadoop-2.4."
exit_with_usage
;;
--with-yarn)
Expand Down
14 changes: 0 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1614,20 +1614,6 @@
http://hadoop.apache.org/docs/ra.b.c/hadoop-project-dist/hadoop-common/dependency-analysis.html
-->

<profile>
<id>hadoop-0.23</id>
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a Hadoop 0.23.X issue -->
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies>
<properties>
<hadoop.version>0.23.10</hadoop.version>
</properties>
</profile>

<profile>
<id>hadoop-2.2</id>
<properties>
Expand Down
84 changes: 84 additions & 0 deletions python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import itertools

__all__ = ['ParamGridBuilder']


class ParamGridBuilder(object):
"""
Builder for a param grid used in grid search-based model selection.
>>> from classification import LogisticRegression
>>> lr = LogisticRegression()
>>> output = ParamGridBuilder().baseOn({lr.labelCol: 'l'}) \
.baseOn([lr.predictionCol, 'p']) \
.addGrid(lr.regParam, [1.0, 2.0, 3.0]) \
.addGrid(lr.maxIter, [1, 5]) \
.addGrid(lr.featuresCol, ['f']) \
.build()
>>> expected = [ \
{lr.regParam: 1.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 2.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 3.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 1.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 2.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 3.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}]
>>> len(output) == len(expected)
True
>>> all([m in expected for m in output])
True
"""

def __init__(self):
self._param_grid = {}

def addGrid(self, param, values):
"""
Sets the given parameters in this grid to fixed values.
"""
self._param_grid[param] = values

return self

def baseOn(self, *args):
"""
Sets the given parameters in this grid to fixed values.
Accepts either a parameter dictionary or a list of (parameter, value) pairs.
"""
if isinstance(args[0], dict):
self.baseOn(*args[0].items())
else:
for (param, value) in args:
self.addGrid(param, [value])

return self

def build(self):
"""
Builds and returns all combinations of parameters specified
by the param grid.
"""
keys = self._param_grid.keys()
grid_values = self._param_grid.values()
return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)]


if __name__ == "__main__":
import doctest
doctest.testmod()
36 changes: 32 additions & 4 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,27 @@ def fillna(self, value, subset=None):

return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx)

def corr(self, col1, col2, method=None):
"""
Calculates the correlation of two columns of a DataFrame as a double value. Currently only
supports the Pearson Correlation Coefficient.
:func:`DataFrame.corr` and :func:`DataFrameStatFunctions.corr` are aliases.
:param col1: The name of the first column
:param col2: The name of the second column
:param method: The correlation method. Currently only supports "pearson"
"""
if not isinstance(col1, str):
raise ValueError("col1 should be a string.")
if not isinstance(col2, str):
raise ValueError("col2 should be a string.")
if not method:
method = "pearson"
if not method == "pearson":
raise ValueError("Currently only the calculation of the Pearson Correlation " +
"coefficient is supported.")
return self._jdf.stat().corr(col1, col2, method)

def cov(self, col1, col2):
"""
Calculate the sample covariance for the given columns, specified by their names, as a
Expand All @@ -892,13 +913,15 @@ def cov(self, col1, col2):
def crosstab(self, col1, col2):
"""
Computes a pair-wise frequency table of the given columns. Also known as a contingency
table. The number of distinct values for each column should be less than 1e5. The first
table. The number of distinct values for each column should be less than 1e4. The first
column of each row will be the distinct values of `col1` and the column names will be the
distinct values of `col2` sorted in lexicographical order.
distinct values of `col2`. Pairs that have no occurrences will have `null` as their values.
:func:`DataFrame.crosstab` and :func:`DataFrameStatFunctions.crosstab` are aliases.
:param col1: The name of the first column
:param col2: The name of the second column
:param col1: The name of the first column. Distinct items will make the first item of
each row.
:param col2: The name of the second column. Distinct items will make the column names
of the DataFrame.
"""
if not isinstance(col1, str):
raise ValueError("col1 should be a string.")
Expand Down Expand Up @@ -1376,6 +1399,11 @@ class DataFrameStatFunctions(object):
def __init__(self, df):
self.df = df

def corr(self, col1, col2, method=None):
return self.df.corr(col1, col2, method)

corr.__doc__ = DataFrame.corr.__doc__

def cov(self, col1, col2):
return self.df.cov(col1, col2)

Expand Down
11 changes: 9 additions & 2 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,15 +394,22 @@ def test_aggregator(self):
self.assertTrue(95 < g.agg(functions.approxCountDistinct(df.key)).first()[0])
self.assertEqual(100, g.agg(functions.countDistinct(df.value)).first()[0])

def test_corr(self):
import math
df = self.sc.parallelize([Row(a=i, b=math.sqrt(i)) for i in range(10)]).toDF()
corr = df.stat.corr("a", "b")
self.assertTrue(abs(corr - 0.95734012) < 1e-6)

def test_cov(self):
df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF()
cov = df.stat.cov("a", "b")
self.assertTrue(abs(cov - 55.0 / 3) < 1e-6)

def test_crosstab(self):
df = self.sc.parallelize([Row(a=i % 3, b=i % 2) for i in range(1, 7)]).toDF()
ct = df.stat.crosstab("a", "b")
for i, row in enumerate(ct.collect()):
ct = df.stat.crosstab("a", "b").collect()
ct = sorted(ct, lambda r: r[0])
for i, row in enumerate(ct):
self.assertEqual(row[0], str(i))
self.assertTrue(row[1], 1)
self.assertTrue(row[2], 1)
Expand Down
1 change: 1 addition & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ function run_ml_tests() {
echo "Run ml tests ..."
run_test "pyspark/ml/feature.py"
run_test "pyspark/ml/classification.py"
run_test "pyspark/ml/tuning.py"
run_test "pyspark/ml/tests.py"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
*/
class NoSuchTableException extends Exception

class NoSuchDatabaseException extends Exception

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,31 @@

package org.apache.spark.sql.catalyst

import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
import java.io._

import org.apache.spark.util.Utils

package object util {

/** Silences output to stderr or stdout for the duration of f */
def quietly[A](f: => A): A = {
val origErr = System.err
val origOut = System.out
try {
System.setErr(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))
System.setOut(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))

f
} finally {
System.setErr(origErr)
System.setOut(origOut)
}
}

def fileToString(file: File, encoding: String = "UTF-8"): String = {
val inStream = new FileInputStream(file)
val outStream = new ByteArrayOutputStream
Expand All @@ -42,10 +61,9 @@ package object util {
new String(outStream.toByteArray, encoding)
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
def resourceToBytes(
resource: String,
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
Expand All @@ -61,7 +79,14 @@ package object util {
finally {
inStream.close()
}
new String(outStream.toByteArray, encoding)
outStream.toByteArray
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
new String(resourceToBytes(resource, classLoader), encoding)
}

def stringToFile(file: File, str: String): File = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,43 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
StatFunctions.calculateCov(df, Seq(col1, col2))
}

/*
* Calculates the correlation of two columns of a DataFrame. Currently only supports the Pearson
* Correlation Coefficient. For Spearman Correlation, consider using RDD methods found in
* MLlib's Statistics.
*
* @param col1 the name of the column
* @param col2 the name of the column to calculate the correlation against
* @return The Pearson Correlation Coefficient as a Double.
*/
def corr(col1: String, col2: String, method: String): Double = {
require(method == "pearson", "Currently only the calculation of the Pearson Correlation " +
"coefficient is supported.")
StatFunctions.pearsonCorrelation(df, Seq(col1, col2))
}

/**
* Calculates the Pearson Correlation Coefficient of two columns of a DataFrame.
*
* @param col1 the name of the column
* @param col2 the name of the column to calculate the correlation against
* @return The Pearson Correlation Coefficient as a Double.
*/
def corr(col1: String, col2: String): Double = {
corr(col1, col2, "pearson")
}

/**
* Computes a pair-wise frequency table of the given columns. Also known as a contingency table.
* The number of distinct values for each column should be less than 1e5. The first
* The number of distinct values for each column should be less than 1e4. The first
* column of each row will be the distinct values of `col1` and the column names will be the
* distinct values of `col2` sorted in lexicographical order. Counts will be returned as `Long`s.
* distinct values of `col2`. Counts will be returned as `Long`s. Pairs that have no occurrences
* will have `null` as their values.
*
* @param col1 The name of the first column.
* @param col2 The name of the second column.
* @param col1 The name of the first column. Distinct items will make the first item of
* each row.
* @param col2 The name of the second column. Distinct items will make the column names
* of the DataFrame.
* @return A Local DataFrame containing the table
*/
def crosstab(col1: String, col2: String): DataFrame = {
Expand Down
Loading

0 comments on commit 9106585

Please sign in to comment.