Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6880] add test of correct behavior #1

Merged

Conversation

squito
Copy link

@squito squito commented Sep 3, 2015

I think this test case does the trick -- it fails in master, and passes after your changes. Note that I not only had to make one shared stage via a shuffle dependency -- I had to make two shared stages. I put in a comment about the behavior with even one shared stage, and how that might not be what we actually want, but I think we're probably stuck with it (I don't see an easy fix, anyway).

@squito squito changed the title add test of correct behavior [SPARK-6880] add test of correct behavior Sep 3, 2015
@squito
Copy link
Author

squito commented Sep 3, 2015

and btw, this test case also does trigger the original "java.util.NoSuchElementException: key not found: 0" if we remove the original fix as well

@markhamstra
Copy link
Owner

Thanks! I'll take a closer look at this later today.

@squito
Copy link
Author

squito commented Sep 3, 2015

thanks -- I just added another case involving our old friend, stage retries from fetch failure. (mostly copy pasted ... should probably refactor a little if you think its worth keeping it.)

// NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document
// the current behavior. We've already submitted the task set for stage 0 based on job1 --
// even though we have cancelled that job, and now we're running it b/c of job2, we haven't
// updated its properties. It might be desirable to have this actually change to "job2"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I guess we should do a follow-up JIRA and PR to tie off this little loose end.

@markhamstra
Copy link
Owner

Looks good. Just a little clean up and refactoring then I'll merge, after which the outer PR will be ready to go.

@squito
Copy link
Author

squito commented Sep 4, 2015

thanks for looking, mark. I'm actually trying to wrap up a few things before I go on vacation, so might not get to this soon. any chance you can pick this up from here? Or else I can get back to it later, in about a week and a half. (turns out we don't have an urgent need for it from a customer, they were happy with just the previous fix.)

@markhamstra
Copy link
Owner

Yeah, I'll put the final touches on it over the weekend if you don't get to it before then.

@markhamstra markhamstra merged this pull request into markhamstra:SPARK-6880 Sep 8, 2015
markhamstra pushed a commit that referenced this pull request Sep 8, 2015
This PR is based on apache#4229, thanks prabeesh.

Closes apache#4229

Author: Prabeesh K <prabsmails@gmail.com>
Author: zsxwing <zsxwing@gmail.com>
Author: prabs <prabsmails@gmail.com>
Author: Prabeesh K <prabeesh.k@namshi.com>

Closes apache#7833 from zsxwing/pr4229 and squashes the following commits:

9570bec [zsxwing] Fix the variable name and check null in finally
4a9c79e [zsxwing] Fix pom.xml indentation
abf5f18 [zsxwing] Merge branch 'master' into pr4229
935615c [zsxwing] Fix the flaky MQTT tests
47278c5 [zsxwing] Include the project class files
478f844 [zsxwing] Add unpack
5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT tests
734db99 [zsxwing] Merge branch 'master' into pr4229
126608a [Prabeesh K] address the comments
b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229
d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; Revert unncessary changes; fix the python unit test
a6747cb [Prabeesh K] wait for starting the receiver before publishing data
87fc677 [Prabeesh K] address the comments:
97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt
80474d1 [Prabeesh K] fix
1f0cfe9 [Prabeesh K] python style fix
e1ee016 [Prabeesh K] scala style fix
a5a8f9f [Prabeesh K] added Python test
9767d82 [Prabeesh K] implemented Python-friendly class
a11968b [Prabeesh K] fixed python style
795ec27 [Prabeesh K] address comments
ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly
3f4df12 [Prabeesh K] updated version
b34c3c1 [prabs] adress comments
3aa7fff [prabs] Added Python streaming mqtt word count example
b7d42ff [prabs] Mqtt streaming support in Python
markhamstra pushed a commit that referenced this pull request Aug 10, 2016
## What changes were proposed in this pull request?
This patch introduces SQLQueryTestSuite, a basic framework for end-to-end SQL test cases defined in spark/sql/core/src/test/resources/sql-tests. This is a more standard way to test SQL queries end-to-end in different open source database systems, because it is more manageable to work with files.

This is inspired by HiveCompatibilitySuite, but simplified for general Spark SQL tests. Once this is merged, I can work towards porting SQLQuerySuite over, and eventually also move the existing HiveCompatibilitySuite to use this framework.

Unlike HiveCompatibilitySuite, SQLQueryTestSuite compares both the output schema and the output data (in string form).

When there is a mismatch, the error message looks like the following:

```
[info] - blacklist.sql !!! IGNORED !!!
[info] - number-format.sql *** FAILED *** (2 seconds, 405 milliseconds)
[info]   Expected "...147483648	-214748364[8]", but got "...147483648	-214748364[9]" Result should match for query #1 (SQLQueryTestSuite.scala:171)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
[info]   at org.scalatest.Assertions$class.assertResult(Assertions.scala:1171)
```

## How was this patch tested?
This is a test infrastructure change.

Author: petermaxlee <petermaxlee@gmail.com>

Closes apache#14472 from petermaxlee/SPARK-16866.
markhamstra pushed a commit that referenced this pull request Mar 30, 2018
## What changes were proposed in this pull request?

There were two related fixes regarding `from_json`, `get_json_object` and `json_tuple` ([Fix #1](apache@c8803c0),
 [Fix #2](apache@86174ea)), but they weren't comprehensive it seems. I wanted to extend those fixes to all the parsers, and add tests for each case.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#20302 from brkyvz/json-invfix.

(cherry picked from commit e01919e)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
markhamstra pushed a commit that referenced this pull request Mar 30, 2018
This is a backport of apache#20598.

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations.
```
val df = input.toDF
val join =
      df.select('value % 5 as "key", 'value).join(
        df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- StreamingExecutionRelation Memory[#1], value#12  // two different leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]       // both value#1 and value#12 replaces by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
      +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, incorrect join results
      +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   :     +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- Project [value#66 AS value#12]    // solution: project with aliases
         +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20765 from tdas/SPARK-23406-2.3.
markhamstra pushed a commit that referenced this pull request Apr 13, 2018
markhamstra pushed a commit that referenced this pull request May 14, 2018
## What changes were proposed in this pull request?

There were two related fixes regarding `from_json`, `get_json_object` and `json_tuple` ([Fix #1](apache@c8803c0),
 [Fix #2](apache@86174ea)), but they weren't comprehensive it seems. I wanted to extend those fixes to all the parsers, and add tests for each case.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#20302 from brkyvz/json-invfix.
markhamstra pushed a commit that referenced this pull request May 14, 2018
## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations.
```
val df = input.toDF
val join =
      df.select('value % 5 as "key", 'value).join(
        df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- StreamingExecutionRelation Memory[#1], value#12  // two different leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]       // both value#1 and value#12 replaces by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
      +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, incorrect join results
      +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   :     +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- Project [value#66 AS value#12]    // solution: project with aliases
         +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20598 from tdas/SPARK-23406.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants