Skip to content

Commit

Permalink
Resolv conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Lewuathe committed Jun 24, 2015
2 parents 9c329d8 + cc465fd commit d2aa2a0
Show file tree
Hide file tree
Showing 42 changed files with 908 additions and 284 deletions.
16 changes: 15 additions & 1 deletion R/pkg/inst/profile/shell.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@
sc <- SparkR::sparkR.init()
assign("sc", sc, envir=.GlobalEnv)
sqlContext <- SparkR::sparkRSQL.init(sc)
sparkVer <- SparkR:::callJMethod(sc, "version")
assign("sqlContext", sqlContext, envir=.GlobalEnv)
cat("\n Welcome to SparkR!")
cat("\n Welcome to")
cat("\n")
cat(" ____ __", "\n")
cat(" / __/__ ___ _____/ /__", "\n")
cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n")
cat(" /___/ .__/\\_,_/_/ /_/\\_\\")
if (nchar(sparkVer) == 0) {
cat("\n")
} else {
cat(" version ", sparkVer, "\n")
}
cat(" /_/", "\n")
cat("\n")

cat("\n Spark context is available as sc, SQL context is available as sqlContext\n")
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we ecountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
Expand All @@ -147,8 +150,19 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
closeAndWriteOutput();
success = true;
} finally {
if (!success) {
sorter.cleanupAfterError();
if (sorter != null) {
try {
sorter.cleanupAfterError();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,23 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
createWriter(false).stop(false);
}

class PandaException extends RuntimeException {
}

@Test(expected=PandaException.class)
public void writeFailurePropagates() throws Exception {
class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
@Override public boolean hasNext() {
throw new PandaException();
}
@Override public Product2<Object, Object> next() {
return null;
}
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(new BadRecords());
}

@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
Expand Down
124 changes: 117 additions & 7 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.
All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`, `pyspark` shell, or `sparkR` shell.


## Starting Point: `SQLContext`
## Starting Point: SQLContext

<div class="codetabs">
<div data-lang="scala" markdown="1">
Expand Down Expand Up @@ -1036,6 +1036,15 @@ for (teenName in collect(teenNames)) {

</div>

<div data-lang="python" markdown="1">

{% highlight python %}
# sqlContext is an existing HiveContext
sqlContext.sql("REFRESH TABLE my_table")
{% endhighlight %}

</div>

<div data-lang="sql" markdown="1">

{% highlight sql %}
Expand All @@ -1054,7 +1063,7 @@ SELECT * FROM parquetTable

</div>

### Partition discovery
### Partition Discovery

Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
table, data are usually stored in different directories, with partitioning column values encoded in
Expand Down Expand Up @@ -1108,7 +1117,7 @@ can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, w
`true`. When type inference is disabled, string type will be used for the partitioning columns.


### Schema merging
### Schema Merging

Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
Expand Down Expand Up @@ -1208,6 +1217,79 @@ printSchema(df3)

</div>

### Hive metastore Parquet table conversion

When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own
Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the
`spark.sql.hive.convertMetastoreParquet` configuration, and is turned on by default.

#### Hive/Parquet Schema Reconciliation

There are two key differences between Hive and Parquet from the perspective of table schema
processing.

1. Hive is case insensitive, while Parquet is not
1. Hive considers all columns nullable, while nullability in Parquet is significant

Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:

1. Fields that have the same name in both schema must have the same data type regardless of
nullability. The reconciled field should have the data type of the Parquet side, so that
nullability is respected.

1. The reconciled schema contains exactly those fields defined in Hive metastore schema.

- Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
- Any fileds that only appear in the Hive metastore schema are added as nullable field in the
reconciled schema.

#### Metadata Refreshing

Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
conversion is enabled, metadata of those converted tables are also cached. If these tables are
updated by Hive or other external tools, you need to refresh them manually to ensure consistent
metadata.

<div class="codetabs">

<div data-lang="scala" markdown="1">

{% highlight scala %}
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
{% endhighlight %}

</div>

<div data-lang="java" markdown="1">

{% highlight java %}
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
{% endhighlight %}

</div>

<div data-lang="python" markdown="1">

{% highlight python %}
# sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
{% endhighlight %}

</div>

<div data-lang="sql" markdown="1">

{% highlight sql %}
REFRESH TABLE my_table;
{% endhighlight %}

</div>

</div>

### Configuration

Configuration of Parquet can be done using the `setConf` method on `SQLContext` or by running
Expand Down Expand Up @@ -1266,6 +1348,34 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
support.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.output.committer.class</code></td>
<td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
<td>
<p>
The output committer class used by Parquet. The specified class needs to be a subclass of
<code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
</p>
<p>
<b>Note:</b>
<ul>
<li>
This option must be set via Hadoop <code>Configuration</code> rather than Spark
<code>SQLConf</code>.
</li>
<li>
This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
</li>
</ul>
</p>
<p>
Spark SQL comes with a builtin
<code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
efficient then the default Parquet output committer when writing data to S3.
</p>
</td>
</tr>
</table>

## JSON Datasets
Expand Down Expand Up @@ -1445,8 +1555,8 @@ This command builds a new assembly jar that includes Hive. Note that this Hive a
on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries
(SerDes) in order to access data stored in Hive.

Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. Please note when running
the query on a YARN cluster (`yarn-cluster` mode), the `datanucleus` jars under the `lib_managed/jars` directory
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. Please note when running
the query on a YARN cluster (`yarn-cluster` mode), the `datanucleus` jars under the `lib_managed/jars` directory
and `hive-site.xml` under `conf/` directory need to be available on the driver and all executors launched by the
YARN cluster. The convenient way to do this is adding them through the `--jars` option and `--file` option of the
`spark-submit` command.
Expand Down Expand Up @@ -1794,7 +1904,7 @@ that these options will be deprecated in future release as more optimizations ar
Configures the number of partitions to use when shuffling data for joins or aggregations.
</td>
</tr>
<tr>
<tr>
<td><code>spark.sql.planner.externalSort</code></td>
<td>false</td>
<td>
Expand Down Expand Up @@ -1889,7 +1999,7 @@ options.
#### DataFrame data reader/writer interface

Based on user feedback, we created a new, more fluid API for reading data in (`SQLContext.read`)
and writing data out (`DataFrame.write`),
and writing data out (`DataFrame.write`),
and deprecated the old APIs (e.g. `SQLContext.parquetFile`, `SQLContext.jsonFile`).

See the API docs for `SQLContext.read` (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.{HasElasticNetParam, HasMaxIter, HasRegParam, HasTol}
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS._
Expand All @@ -41,7 +41,8 @@ import org.apache.spark.util.StatCounter
* Params for linear regression.
*/
private[regression] trait LinearRegressionParams extends PredictorParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
with HasFitIntercept

/**
* :: Experimental ::
Expand Down Expand Up @@ -72,6 +73,14 @@ class LinearRegression(override val uid: String)
def setRegParam(value: Double): this.type = set(regParam, value)
setDefault(regParam -> 0.0)

/**
* Set if we should fit the intercept
* Default is true.
* @group setParam
*/
def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
setDefault(fitIntercept -> true)

/**
* Set the ElasticNet mixing parameter.
* For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
Expand Down Expand Up @@ -123,6 +132,7 @@ class LinearRegression(override val uid: String)
val numFeatures = summarizer.mean.size
val yMean = statCounter.mean
val yStd = math.sqrt(statCounter.variance)
// look at glmnet5.m L761 maaaybe that has info

// If the yStd is zero, then the intercept is yMean with zero weights;
// as a result, training is not needed.
Expand All @@ -142,7 +152,7 @@ class LinearRegression(override val uid: String)
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam

val costFun = new LeastSquaresCostFun(instances, yStd, yMean,
val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
featuresStd, featuresMean, effectiveL2RegParam)

val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
Expand Down Expand Up @@ -180,7 +190,7 @@ class LinearRegression(override val uid: String)
// The intercept in R's GLMNET is computed using closed form after the coefficients are
// converged. See the following discussion for detail.
// http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
val intercept = yMean - dot(weights, Vectors.dense(featuresMean))
val intercept = if ($(fitIntercept)) yMean - dot(weights, Vectors.dense(featuresMean)) else 0.0
if (handlePersistence) instances.unpersist()

// TODO: Converts to sparse format based on the storage, but may base on the scoring speed.
Expand Down Expand Up @@ -234,13 +244,18 @@ class LinearRegressionModel private[ml] (
* See this discussion for detail.
* http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
*
* When training with intercept enabled,
* The objective function in the scaled space is given by
* {{{
* L = 1/2n ||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2,
* }}}
* where \bar{x_i} is the mean of x_i, \hat{x_i} is the standard deviation of x_i,
* \bar{y} is the mean of label, and \hat{y} is the standard deviation of label.
*
* If we fitting the intercept disabled (that is forced through 0.0),
* we can use the same equation except we set \bar{y} and \bar{x_i} to 0 instead
* of the respective means.
*
* This can be rewritten as
* {{{
* L = 1/2n ||\sum_i (w_i/\hat{x_i})x_i - \sum_i (w_i/\hat{x_i})\bar{x_i} - y / \hat{y}
Expand All @@ -255,6 +270,7 @@ class LinearRegressionModel private[ml] (
* \sum_i w_i^\prime x_i - y / \hat{y} + offset
* }}}
*
*
* Note that the effective weights and offset don't depend on training dataset,
* so they can be precomputed.
*
Expand Down Expand Up @@ -301,6 +317,7 @@ private class LeastSquaresAggregator(
weights: Vector,
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double]) extends Serializable {

Expand All @@ -321,7 +338,7 @@ private class LeastSquaresAggregator(
}
i += 1
}
(weightsArray, -sum + labelMean / labelStd, weightsArray.length)
(weightsArray, if (fitIntercept) labelMean / labelStd - sum else 0.0, weightsArray.length)
}

private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)
Expand Down Expand Up @@ -404,6 +421,7 @@ private class LeastSquaresCostFun(
data: RDD[(Double, Vector)],
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double],
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
Expand All @@ -412,7 +430,7 @@ private class LeastSquaresCostFun(
val w = Vectors.fromBreeze(weights)

val leastSquaresAggregator = data.treeAggregate(new LeastSquaresAggregator(w, labelStd,
labelMean, featuresStd, featuresMean))(
labelMean, fitIntercept, featuresStd, featuresMean))(
seqOp = (c, v) => (c, v) match {
case (aggregator, (label, features)) => aggregator.add(label, features)
},
Expand Down
Loading

0 comments on commit d2aa2a0

Please sign in to comment.