Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Cannot invoke \"java.util.Collection.size()\" because \"this.exprValues\" is null #2436

Closed
penghuo opened this issue Nov 21, 2023 · 7 comments
Assignees
Labels
bug Something isn't working Flint v2.12.0 Issues targeting release v2.12.0

Comments

@penghuo
Copy link
Collaborator

penghuo commented Nov 21, 2023

Issue

  • currently, REPL query handler
    • firstly, search query_execution_result with queryId, if found result, return
    • if not result found, get statement state from query_execution_request with queryId
  • it is possible, search query_execution_result return empty result, but get query_execution_request return state=success. e.g.
    • T1, plugin search query_execution_result, return empty
    • T2, EMR-S, bulk write query_execution_result with refresh = wait_for.
    • T3, EMR-S, update query_execution_request with state=success.
    • T4, plugin read query_execution_request, return state=success

Solution

When Plugin read result index, it should read state first, if state is success, read from request_index with _refresh=force or read with docId

import requests
from requests.auth import HTTPBasicAuth
import json
import time
import threading
from datetime import datetime

# Elasticsearch settings
url = "https://xxx.aos.us-west-2.on.aws"

es_user = ''
es_password = ''
es_index = 'query_execution_result_mys3'

def get_statement_state(docId):
    # print(f"[{docId}] read")
    es_url = f"{url}/.query_execution_request_mys3/_doc/{docId}"

    print(f'[{docId}] [{datetime.now()}] GET')
    response = requests.get(
        es_url,
        auth=HTTPBasicAuth(es_user, es_password),
        headers={"Content-Type": "application/json"},
    )
    if response.status_code == 200:
        result = response.json()
        print(f'[{docId}] [{datetime.now()}] current state: {result["_source"]["state"]}')
        return result["_source"]["state"]
    elif response.status_code == 404:
        # print(f"[{docId}] No statement doc found.")
        return "not_found"
    else:
        print(f"Failed to read documents. Status code: {response.status_code}")
        print(f"[{docId}] {response.text}")
        return "failed"


def search_result(docId):
    # print(f"[{docId}] search")
    es_url = f"{url}/query_execution_result_mys3/_search"
    query = {
              "query": {
              "bool": {
                "must": [
                  { "term": { "queryId": f"{docId}" } }
                ]
              }
            }}
    print(f'[{docId}] [{datetime.now()}] SEARCH')
    response = requests.get(
        es_url,
        auth=HTTPBasicAuth(es_user, es_password),
        headers={"Content-Type": "application/json"},
        json = query
    )
    if response.status_code == 200:
        result = response.json()
        hits_total_value = result["hits"]['total']['value']
        if (hits_total_value == 0):
            print(f"[{docId}] [{datetime.now()}] No result doc found.")
            return "not_found"
        elif hits_total_value == 1:
            print(f'[{docId}] [{datetime.now()}] Result doc. {result["hits"]["total"]}')
            return "success"
        else:
            print(f'[{docId}] Unexpected {result["hits"]}')
            return "failed"
    else:
        print(f"Failed to read documents. Status code: {response.status_code}")
        print(f"[{docId}] {response.text}")
        return "failed"

def repl(queryId):
  while True:
    st_state = get_statement_state(queryId)
    result_state = 'not_found'
    if st_state == 'success':
      result_state = search_result(queryId)
    
    if (result_state == 'not_found') & (st_state == 'success'):
       print(f"[{queryId}] >>>>>>>>>> BUG FOUND >>>>>>>>>>")
    elif (result_state == 'success') & (st_state == 'success'):
       print(f"[{queryId}] SUCCESS")
       break
    time.sleep(1)

if __name__ == "__main__":
  N = 50
  for i in range(1, 1 + N):
      repl(i)

Migration Plan

Relate to opensearch-project/opensearch-spark#171.

@penghuo penghuo added enhancement New feature or request untriaged v2.12.0 Issues targeting release v2.12.0 labels Nov 21, 2023
@penghuo penghuo changed the title [FEATURE] Cannot invoke \"java.util.Collection.size()\" because \"this.exprValues\" is null [BUG] Cannot invoke \"java.util.Collection.size()\" because \"this.exprValues\" is null Nov 22, 2023
@penghuo penghuo added the bug Something isn't working label Nov 22, 2023
@penghuo penghuo added Flint and removed enhancement New feature or request labels Nov 29, 2023
@dai-chen dai-chen self-assigned this Nov 29, 2023
@dai-chen
Copy link
Collaborator

dai-chen commented Nov 30, 2023

Current Code

Pseudocode below. See actual code in https://github.com/opensearch-project/sql/blob/main/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java

def getQueryResponse():
  val result = getResponseFromResultIndex()
  if (result.has(DATA_FIELD)):
    return result
  else
    return getResponseFromRequestIndex() // here statement state maybe success which is the root cause

Quick Fix and Its Problem

To fix the issue above, we can simply do query statement check first and then query result doc if statement is successful.

def getQueryResponse():
  val stmt = getResponseFromRequestIndex() // read statement state first
  if (stmt.state == SUCCESS)
    return getResponseFromResultIndex() // query result doc now only if statement state is success
  else
    return stmt

However, due to the WAIT_UNTIL semantic changes in #1801, there is no RAW (read-after-write) consistency guarantee even if the query result doc is indexed with WAIT_UNTIL.

Related: #2439

Other Options

Summary of preferred option and cons of other options:

  • Option 1: keep waiting until result doc available, but result doc maybe unavailable anyway (due to unbounded refresh delay or even auto refresh disabled by user)
  • Option 2 (preferred): slightly refine the meaning of successful statement. This is minimum change.
  • Option 3: make result index to hidden index. This requires lots of changes across multiple components, from UI to plugin to Spark side.
  • Option 4: use query ID as doc ID to switch from DSL search to doc get API. This needs some work and still have problem if query result is split into multiple doc or throw-away work if query result is moved to other storage

Option 1: Check statement state first and waiting for result on behalf of caller

def getQueryResponse():
  val stmt = getResponseFromRequestIndex()
  if (stmt.state == SUCCESS)
    val result = null
    while ((result = getResponseFromResultIndex()) == null): // retry a few times
      sleep(10s)
    return (result != null) ? result : (failed or running)
  else
    return stmt

Option 2: Consider statement still running if statement state is success but no result

In this way, we define a query statement is complete if it's

  • a) SUCCESS and meanwhile query result is available (searchable) to PPL plugin;
  • b) other Non-SUCCESS final state, such as FAILED
def getQueryResponse():
  val result = getResponseFromResultIndex()
  if (result.has(DATA_FIELD)):
    return result
  else
    val stmt = getResponseFromRequestIndex()
    if (stmt.state == SUCCESS)
      stmt.state = RUNNING // make it running because result is not available yet
    return stmt

Option 3: Change result index to hidden index

There will be no issue if the result index is a hidden index (starting with dot. similar as query execution request index)

Option 4: Change search to get on result index

Currently it's 1-to-1 mapping from query statement to query result doc. If we use queryId as result doc ID, we can do get document instead of search request. (still need to double check if get doc doesn't impact by the problem mentioned above)

def getQueryResponse():
  val stmt = getResponseFromRequestIndex()
  if (stmt.state == SUCCESS)
    return getResponseFromResultIndex(docId) // get result doc directly 
  else
    return stmt

@kaituo
Copy link
Contributor

kaituo commented Nov 30, 2023

In Option 2, does "stmt.state = RUNNING" change the stored state in index? Or is it just sql plugin's change in memory?

@dai-chen
Copy link
Collaborator

In Option 2, does "stmt.state = RUNNING" change the stored state in index? Or is it just sql plugin's change in memory?

Sorry, the pseudocode is a little confusing. Actually it's meant to express that we just change the state field in statement JSON and return it to caller.

def getQueryResponse():
  val result = getResponseFromResultIndex()
  if (result.has(DATA_FIELD)):
    return result
  else
    val stmtJson = getResponseFromRequestIndex()
    if (stmtJson.state == SUCCESS)
      stmtJson.state = RUNNING // only change the JSON field value
    return stmtJson

@kaituo
Copy link
Contributor

kaituo commented Nov 30, 2023

In Option 2, does "stmt.state = RUNNING" change the stored state in index? Or is it just sql plugin's change in memory?

Sorry, the pseudocode is a little confusing. Actually it's meant to express that we just change the state field in statement JSON and return it to caller.

def getQueryResponse():
  val result = getResponseFromResultIndex()
  if (result.has(DATA_FIELD)):
    return result
  else
    val stmtJson = getResponseFromRequestIndex()
    if (stmtJson.state == SUCCESS)
      stmtJson.state = RUNNING // only change the JSON field value
    return stmtJson

So the change won't impact the stored state in index?

@dai-chen
Copy link
Collaborator

In Option 2, does "stmt.state = RUNNING" change the stored state in index? Or is it just sql plugin's change in memory?

Sorry, the pseudocode is a little confusing. Actually it's meant to express that we just change the state field in statement JSON and return it to caller.

def getQueryResponse():
  val result = getResponseFromResultIndex()
  if (result.has(DATA_FIELD)):
    return result
  else
    val stmtJson = getResponseFromRequestIndex()
    if (stmtJson.state == SUCCESS)
      stmtJson.state = RUNNING // only change the JSON field value
    return stmtJson

So the change won't impact the stored state in index?

Yes, I'm thinking we just return RUNNING in this case but won't touch stored statement in index. Basically, we re-define complete/successful query statement as statement state=SUCCESS + result doc available (searchable)

@kaituo
Copy link
Contributor

kaituo commented Nov 30, 2023

In Option 2, does "stmt.state = RUNNING" change the stored state in index? Or is it just sql plugin's change in memory?

Sorry, the pseudocode is a little confusing. Actually it's meant to express that we just change the state field in statement JSON and return it to caller.

def getQueryResponse():
  val result = getResponseFromResultIndex()
  if (result.has(DATA_FIELD)):
    return result
  else
    val stmtJson = getResponseFromRequestIndex()
    if (stmtJson.state == SUCCESS)
      stmtJson.state = RUNNING // only change the JSON field value
    return stmtJson

So the change won't impact the stored state in index?

Yes, I'm thinking we just return RUNNING in this case but won't touch stored statement in index. Basically, we re-define complete/successful query statement as statement state=SUCCESS + result doc available (searchable)

Cool, option 2 sounds good to me.

@dai-chen
Copy link
Collaborator

dai-chen commented Dec 1, 2023

Closing. Created issue for later Spark changes: opensearch-project/opensearch-spark#180

@dai-chen dai-chen closed this as completed Dec 1, 2023
kaituo added a commit to kaituo/opensearch-spark that referenced this issue Feb 6, 2024
This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (opensearch-project@be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
kaituo added a commit to kaituo/opensearch-spark that referenced this issue Feb 6, 2024
This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (opensearch-project@be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
kaituo added a commit to kaituo/opensearch-spark that referenced this issue Feb 6, 2024
This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (opensearch-project@be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
kaituo added a commit to kaituo/opensearch-spark that referenced this issue Feb 8, 2024
This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (opensearch-project@be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
kaituo added a commit to opensearch-project/opensearch-spark that referenced this issue Feb 9, 2024
* Fix Session state bug and improve Query Efficiency in REPL

This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.

Signed-off-by: Kaituo Li <kaituo@amazon.com>

* added Java doc

Signed-off-by: Kaituo Li <kaituo@amazon.com>

* fix IT by restore env variable change

Signed-off-by: Kaituo Li <kaituo@amazon.com>

---------

Signed-off-by: Kaituo Li <kaituo@amazon.com>
kaituo added a commit to kaituo/opensearch-spark that referenced this issue Feb 9, 2024
…h-project#245)

* Fix Session state bug and improve Query Efficiency in REPL

This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (opensearch-project@be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.

Signed-off-by: Kaituo Li <kaituo@amazon.com>

* added Java doc

Signed-off-by: Kaituo Li <kaituo@amazon.com>

* fix IT by restore env variable change

Signed-off-by: Kaituo Li <kaituo@amazon.com>

---------

Signed-off-by: Kaituo Li <kaituo@amazon.com>
kaituo added a commit to opensearch-project/opensearch-spark that referenced this issue Feb 12, 2024
* Fix Session state bug and improve Query Efficiency in REPL

This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.



* added Java doc



* fix IT by restore env variable change



---------

Signed-off-by: Kaituo Li <kaituo@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Flint v2.12.0 Issues targeting release v2.12.0
Projects
None yet
Development

No branches or pull requests

3 participants