Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHS-NG M1: Add KVStore abstraction, LevelDB implementation. #3

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bd57882
[SPARK-20603][SS][TEST] Set default number of topic partitions to 1 t…
zsxwing May 5, 2017
b31648c
[SPARK-20557][SQL] Support for db column type TIMESTAMP WITH TIME ZONE
JannikArndt May 5, 2017
5d75b14
[SPARK-20616] RuleExecutor logDebug of batch results should show diff…
juliuszsompolski May 5, 2017
b433aca
[SPARK-20614][PROJECT INFRA] Use the same log4j configuration with Je…
HyukjinKwon May 6, 2017
cafca54
[SPARK-20557][SQL] Support JDBC data type Time with Time Zone
gatorsmile May 7, 2017
63d90e7
[SPARK-18777][PYTHON][SQL] Return UDF from udf.register
zero323 May 7, 2017
37f963a
[SPARK-20518][CORE] Supplement the new blockidsuite unit tests
heary-cao May 7, 2017
88e6d75
[SPARK-20484][MLLIB] Add documentation to ALS code
danielyli May 7, 2017
2cf83c4
[SPARK-7481][BUILD] Add spark-hadoop-cloud module to pull in object s…
steveloughran May 7, 2017
7087e01
[SPARK-20543][SPARKR][FOLLOWUP] Don't skip tests on AppVeyor
felixcheung May 7, 2017
500436b
[MINOR][SQL][DOCS] Improve unix_timestamp's scaladoc (and typo hunting)
jaceklaskowski May 7, 2017
1f73d35
[SPARK-20550][SPARKR] R wrapper for Dataset.alias
zero323 May 7, 2017
f53a820
[SPARK-16931][PYTHON][SQL] Add Python wrapper for bucketBy
zero323 May 8, 2017
2269155
[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps
squito May 8, 2017
c24bdaa
[SPARK-20626][SPARKR] address date test warning with timezone on windows
felixcheung May 8, 2017
42cc6d1
[SPARK-20380][SQL] Unable to set/unset table comment property using A…
sujith71955 May 8, 2017
2fdaeb5
[SPARKR][DOC] fix typo in vignettes
May 8, 2017
0f820e2
[SPARK-20519][SQL][CORE] Modify to prevent some possible runtime exce…
10110346 May 8, 2017
1552665
[SPARK-19956][CORE] Optimize a location order of blocks with topology…
ConeyLiu May 8, 2017
58518d0
[SPARK-20596][ML][TEST] Consolidate and improve ALS recommendAll test…
May 8, 2017
f3b7e0b
SHS-NG M1: Add KVStore abstraction, LevelDB implementation.
Oct 3, 2016
52ed2b4
SHS-NG M1: Add support for arrays when indexing.
Nov 1, 2016
4112afe
SHS-NG M1: Fix counts in LevelDB when updating entries.
Nov 3, 2016
718cabd
SHS-NG M1: Try to prevent db use after close.
Mar 18, 2017
45a027f
SHS-NG M1: Use Java 8 lambdas.
Mar 24, 2017
e592bf6
SHS-NG M1: Compress values stored in LevelDB.
Mar 25, 2017
889963f
SHS-NG M1: Use type aliases as keys in Level DB.
Mar 25, 2017
84ab160
SHS-NG M1: Separate index introspection from storage.
Apr 3, 2017
7b87021
SHS-NG M1: Remove unused methods from KVStore.
Apr 26, 2017
5197c21
SHS-NG M1: Add "max" and "last" to kvstore iterators.
May 5, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -3745,3 +3745,27 @@ setMethod("hint",
jdf <- callJMethod(x@sdf, "hint", name, parameters)
dataFrame(jdf)
})

#' alias
#'
#' @aliases alias,SparkDataFrame-method
#' @family SparkDataFrame functions
#' @rdname alias
#' @name alias
#' @export
#' @examples
#' \dontrun{
#' df <- alias(createDataFrame(mtcars), "mtcars")
#' avg_mpg <- alias(agg(groupBy(df, df$cyl), avg(df$mpg)), "avg_mpg")
#'
#' head(select(df, column("mtcars.mpg")))
#' head(join(df, avg_mpg, column("mtcars.cyl") == column("avg_mpg.cyl")))
#' }
#' @note alias(SparkDataFrame) since 2.3.0
setMethod("alias",
signature(object = "SparkDataFrame"),
function(object, data) {
stopifnot(is.character(data))
sdf <- callJMethod(object@sdf, "alias", data)
dataFrame(sdf)
})
16 changes: 8 additions & 8 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,19 @@ createMethods <- function() {

createMethods()

#' alias
#'
#' Set a new name for a column
#'
#' @param object Column to rename
#' @param data new name to use
#'
#' @rdname alias
#' @name alias
#' @aliases alias,Column-method
#' @family colum_func
#' @export
#' @note alias since 1.4.0
#' @examples \dontrun{
#' df <- createDataFrame(iris)
#'
#' head(select(
#' df, alias(df$Sepal_Length, "slength"), alias(df$Petal_Length, "plength")
#' ))
#' }
#' @note alias(Column) since 1.4.0
setMethod("alias",
signature(object = "Column"),
function(object, data) {
Expand Down
11 changes: 11 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,17 @@ setGeneric("value", function(bcast) { standardGeneric("value") })
#' @export
setGeneric("agg", function (x, ...) { standardGeneric("agg") })

#' alias
#'
#' Returns a new SparkDataFrame or a Column with an alias set. Equivalent to SQL "AS" keyword.
#'
#' @name alias
#' @rdname alias
#' @param object x a SparkDataFrame or a Column
#' @param data new name to use
#' @return a SparkDataFrame or a Column
NULL

#' @rdname arrange
#' @export
setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") })
Expand Down
16 changes: 15 additions & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

if (.Platform$OS.type == "windows") {
Sys.setenv(TZ = "GMT")
}

test_that("calling sparkRSQL.init returns existing SQL context", {
skip_on_cran()

Expand Down Expand Up @@ -1223,6 +1227,16 @@ test_that("select with column", {
expect_equal(columns(df4), c("name", "age"))
expect_equal(count(df4), 3)

# Test select with alias
df5 <- alias(df, "table")

expect_equal(columns(select(df5, column("table.name"))), "name")
expect_equal(columns(select(df5, "table.name")), "name")

# Test that stats::alias is not masked
expect_is(alias(aov(yield ~ block + N * P * K, npk)), "listof")


expect_error(select(df, c("name", "age"), "name"),
"To select multiple columns, use a character vector or list for col")
})
Expand Down Expand Up @@ -3387,7 +3401,7 @@ compare_list <- function(list1, list2) {

# This should always be the **very last test** in this test file.
test_that("No extra files are created in SPARK_HOME by starting session and making calls", {
skip_on_cran()
skip_on_cran() # skip because when run from R CMD check SPARK_HOME is not the current directory

# Check that it is not creating any extra file.
# Does not check the tempdir which would be cleaned up after.
Expand Down
36 changes: 18 additions & 18 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ We can view the first few rows of the `SparkDataFrame` by `head` or `showDF` fun
head(carsDF)
```

Common data processing operations such as `filter`, `select` are supported on the `SparkDataFrame`.
Common data processing operations such as `filter` and `select` are supported on the `SparkDataFrame`.
```{r}
carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
Expand Down Expand Up @@ -379,7 +379,7 @@ out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
```

Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function should be a `data.frame`, but no schema is required in this case. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
Like `dapply`, `dapplyCollect` can apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of the function should be a `data.frame`, but no schema is required in this case. Note that `dapplyCollect` can fail if the output of the UDF on all partitions cannot be pulled into the driver's memory.

```{r}
out <- dapplyCollect(
Expand All @@ -405,7 +405,7 @@ result <- gapply(
head(arrange(result, "max_mpg", decreasing = TRUE))
```

Like gapply, `gapplyCollect` applies a function to each partition of a `SparkDataFrame` and collect the result back to R `data.frame`. The output of the function should be a `data.frame` but no schema is required in this case. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
Like `gapply`, `gapplyCollect` can apply a function to each partition of a `SparkDataFrame` and collect the result back to R `data.frame`. The output of the function should be a `data.frame` but no schema is required in this case. Note that `gapplyCollect` can fail if the output of the UDF on all partitions cannot be pulled into the driver's memory.

```{r}
result <- gapplyCollect(
Expand Down Expand Up @@ -458,20 +458,20 @@ options(ops)


### SQL Queries
A `SparkDataFrame` can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
A `SparkDataFrame` can also be registered as a temporary view in Spark SQL so that one can run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.

```{r}
people <- read.df(paste0(sparkR.conf("spark.home"),
"/examples/src/main/resources/people.json"), "json")
```

Register this SparkDataFrame as a temporary view.
Register this `SparkDataFrame` as a temporary view.

```{r}
createOrReplaceTempView(people, "people")
```

SQL statements can be run by using the sql method.
SQL statements can be run using the sql method.
```{r}
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
Expand Down Expand Up @@ -780,7 +780,7 @@ head(predict(isoregModel, newDF))
`spark.gbt` fits a [gradient-boosted tree](https://en.wikipedia.org/wiki/Gradient_boosting) classification or regression model on a `SparkDataFrame`.
Users can call `summary` to get a summary of the fitted model, `predict` to make predictions, and `write.ml`/`read.ml` to save/load fitted models.

Similar to the random forest example above, we use the `longley` dataset to train a gradient-boosted tree and make predictions:
We use the `longley` dataset to train a gradient-boosted tree and make predictions:

```{r, warning=FALSE}
df <- createDataFrame(longley)
Expand Down Expand Up @@ -820,7 +820,7 @@ head(select(fitted, "Class", "prediction"))

`spark.gaussianMixture` fits multivariate [Gaussian Mixture Model](https://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) (GMM) against a `SparkDataFrame`. [Expectation-Maximization](https://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) (EM) is used to approximate the maximum likelihood estimator (MLE) of the model.

We use a simulated example to demostrate the usage.
We use a simulated example to demonstrate the usage.
```{r}
X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
Expand Down Expand Up @@ -851,9 +851,9 @@ head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20

* Topics and documents both exist in a feature space, where feature vectors are vectors of word counts (bag of words).

* Rather than estimating a clustering using a traditional distance, LDA uses a function based on a statistical model of how text documents are generated.
* Rather than clustering using a traditional distance, LDA uses a function based on a statistical model of how text documents are generated.

To use LDA, we need to specify a `features` column in `data` where each entry represents a document. There are two type options for the column:
To use LDA, we need to specify a `features` column in `data` where each entry represents a document. There are two options for the column:

* character string: This can be a string of the whole document. It will be parsed automatically. Additional stop words can be added in `customizedStopWords`.

Expand Down Expand Up @@ -901,7 +901,7 @@ perplexity

`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](http://dl.acm.org/citation.cfm?id=1608614).

There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, and `nonnegative`. For a complete list, refer to the help file.

```{r, eval=FALSE}
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
Expand Down Expand Up @@ -981,7 +981,7 @@ testSummary


### Model Persistence
The following example shows how to save/load an ML model by SparkR.
The following example shows how to save/load an ML model in SparkR.
```{r}
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
Expand Down Expand Up @@ -1079,19 +1079,19 @@ There are three main object classes in SparkR you may be working with.
+ `sdf` stores a reference to the corresponding Spark Dataset in the Spark JVM backend.
+ `env` saves the meta-information of the object such as `isCached`.

It can be created by data import methods or by transforming an existing `SparkDataFrame`. We can manipulate `SparkDataFrame` by numerous data processing functions and feed that into machine learning algorithms.
It can be created by data import methods or by transforming an existing `SparkDataFrame`. We can manipulate `SparkDataFrame` by numerous data processing functions and feed that into machine learning algorithms.

* `Column`: an S4 class representing column of `SparkDataFrame`. The slot `jc` saves a reference to the corresponding Column object in the Spark JVM backend.
* `Column`: an S4 class representing a column of `SparkDataFrame`. The slot `jc` saves a reference to the corresponding `Column` object in the Spark JVM backend.

It can be obtained from a `SparkDataFrame` by `$` operator, `df$col`. More often, it is used together with other functions, for example, with `select` to select particular columns, with `filter` and constructed conditions to select rows, with aggregation functions to compute aggregate statistics for each group.
It can be obtained from a `SparkDataFrame` by `$` operator, e.g., `df$col`. More often, it is used together with other functions, for example, with `select` to select particular columns, with `filter` and constructed conditions to select rows, with aggregation functions to compute aggregate statistics for each group.

* `GroupedData`: an S4 class representing grouped data created by `groupBy` or by transforming other `GroupedData`. Its `sgd` slot saves a reference to a RelationalGroupedDataset object in the backend.
* `GroupedData`: an S4 class representing grouped data created by `groupBy` or by transforming other `GroupedData`. Its `sgd` slot saves a reference to a `RelationalGroupedDataset` object in the backend.

This is often an intermediate object with group information and followed up by aggregation operations.
This is often an intermediate object with group information and followed up by aggregation operations.

### Architecture

A complete description of architecture can be seen in reference, in particular the paper *SparkR: Scaling R Programs with Spark*.
A complete description of architecture can be seen in the references, in particular the paper *SparkR: Scaling R Programs with Spark*.

Under the hood of SparkR is Spark SQL engine. This avoids the overheads of running interpreted R code, and the optimized SQL execution engine in Spark uses structural information about data and computation flow to perform a bunch of optimizations to speed up the computation.

Expand Down
6 changes: 4 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ install:
build_script:
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package

environment:
NOT_CRAN: true

test_script:
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R
- cmd: .\bin\spark-submit2.cmd --driver-java-options "-Dlog4j.configuration=file:///%CD:\=/%/R/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R

notifications:
- provider: Email
on_build_success: false
on_build_failure: false
on_build_status_changed: false

14 changes: 14 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,19 @@
<parquet.deps.scope>provided</parquet.deps.scope>
</properties>
</profile>

<!--
Pull in spark-hadoop-cloud and its associated JARs,
-->
<profile>
<id>hadoop-cloud</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hadoop-cloud_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
90 changes: 90 additions & 0 deletions common/kvstore/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-kvstore_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Local DB</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>kvstore</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Loading