Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New records not added in Simple polling mode #92

Closed
troyschneringer opened this issue May 30, 2013 · 19 comments
Closed

New records not added in Simple polling mode #92

troyschneringer opened this issue May 30, 2013 · 19 comments

Comments

@troyschneringer
Copy link

I am using the JDBC River with MSSQL

I am regularly seeing new records in my source DB not added to the index. I have restarted ES and it still does not seem to find the newly added records.

Here's my configuration:

{
    "type" : "jdbc",
    "jdbc" : {
        "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        "url" : "jdbc:sqlserver://[server]:1433;databaseName=[database]",
        "user" : "[user]",
        "password" : "[password]",
        "sql" : "[sql]",
        "strategy": "simple",
        "poll": "5m",
        "autocommit": true
    },
    "index" : {
        "index" : "[index]",
        "type" : "sqlRecord",
        "versioning": true
    }
}

I'd be happy to help debug this if you can point me in the right direction.

@troyschneringer
Copy link
Author

Any word on this?

@jprante
Copy link
Owner

jprante commented Jun 25, 2013

Sorry, but I would like to learn what the SQL statement is. In general, it is not possible with the "simple" strategy to update only a part of the records in the SQL statement, all records have to be moved to the ES index. I will work on a better strategy to accomplish partial updates.

@osbornm
Copy link

osbornm commented Jul 8, 2013

@jprante The problem is not that we want only part of the records to be added. We are okay if all the records are removed and added back. The problem we have is that a new record shows up in the SQL results but is not added to ES. We aren't able to find a type of record that triggers this just sometimes it just doesn't show up.

@chrisribe
Copy link

It would be nice to have a strategy like simple but all subsequent cycle polls would query MySQL with all records bigger than last inserted record "_id" (default 0).

"jdbc" : {
    "driver" : "com.mysql.jdbc.Driver",
    "url" : "jdbc:mysql://10.49.33.168:3306/db",
    "user" : "<user>",
    "password" : "<pwd>",
    "strategy" : "simple",
    "poll" : "15s",
    "fetchsize" : 1000,
    "sql" : "select id as _id, * from users where id > ?",
    "sqlparams": ["$last_id"]
}

This way I think we would have a river that could handle tables with millions of rows when new data is inserted.
I cant see myself using this in production if I need to rescan ALL rows every 5 minutes...

Updates and deletes is another issue, but could probably be handled by the table method (ex: one for updates and one for deletes).

Chris

@randywallace
Copy link

I agree with @chrisribe ; I have a table that receives more than 400,000 inserts a day, but never an update or delete. As such, with over 1B rows at this point, a regular select * from table is quite impossible.

Since most people will sensibly assign the _id to whatever the PRIMARY KEY is on their schema, selecting rows using the aforementioned method should be quite performant to boot.

@jprante
Copy link
Owner

jprante commented Aug 2, 2013

At the moment, I have implemented two special binding parameters, $job and $now. $job is replaced by the version used for ES (difficult to know from outside), $now is the current time.

For example, in MySQL

"sql" : "select orders.created from orders where TIMESTAMPDIFF(DAY, ?, orders.created) > 14",
"sqlparams" : [ "$now" ]

@chrisribe
Copy link

Thanks for the new options, but are they in reply to me and "randywallace" ?

Using a TIMESTAMPDIFF of the current day plus or minus 14 days will work but is not the optimal solution. Using the last id ensures you only get the new data. The TIMESTAMPDIFF method will always get the last N records (could be thousands every N minutes) until the datediff range changes. A simple last _id would be best and should not be hard to figure out on your side right ?

Thanks
Chris

@jprante
Copy link
Owner

jprante commented Aug 5, 2013

The JDBC river does currently not track any row ID's or column values related to SQL result rows. Patches are welcome!

@chrisribe
Copy link

Ok I will look at the code to see where this could be done. But I was not thinking of tracking the SQL values / rows but simply getting the last inserted _id in the target elastic search index via something like $last_id. The river would assume an integer value not a guid.

Thanks for the help.
Chris

Sent from my iPhone

On Aug 5, 2013, at 11:11 AM, Jörg Prante notifications@github.com wrote:

The JDBC river does currently not track any row ID's or column values related to SQL result rows. Patches are welcome!


Reply to this email directly or view it on GitHub.

@jprante
Copy link
Owner

jprante commented Aug 6, 2013

I understand, but there is no such thing like last inserted ID in the target ES index. The creation of docs is not necessarily in order, presuming each select must have an ORDER BY clause, and also, you can insert random doc IDs into ES if you want to. Because of this, the JDBC river currently can remember the timestamp of the last cycle.

@gregsheu
Copy link

I use the $now for sqlparam. The River can update the index whenever I got the new data inserted into my MySQL tables, but the versioning is always true in this case. I can't set versioning to false even I set versioning:false inside the index clause. Moreover, does the housekeeping delete documents in default? If keeps deleting my documents after merging some documents, but this is not what I want. I can keep polling and fetching new data into ES index, but I don't want housekeeping to delete my documents. Please advise.
Thanks
curl -XPUT 'localhost:9200/_river/mysql_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://mysqlserver:3306/database",
"user" : "user",
"password" : "password",
"sql" : "select ID as _id from sometable where sometime (?) <= 100", #this is not the working sql, just the example.
"sqlparams" : [ "$now" ],
"strategy" : "simple",
"poll" : "5s"
},
"index" : {
"index" : "jdbc",
"type" : "jdbc",
"versioning" : false,
"acknowledge" : false
}
}'
housekeeping for version 2
housekeeping done, 239 documents deleted, took 30 ms

@jprante
Copy link
Owner

jprante commented Oct 15, 2013

Yes, the housekeeping is getting in the way. I will have to find improvements.

@jprante
Copy link
Owner

jprante commented May 19, 2014

Housekeeping is no longer in JDBC river, issue should be no more.

@jprante jprante closed this as completed May 19, 2014
@phani546
Copy link

Hi jprante,
can you explain me is this feature is available with elastic search version 1.1.1?I am facing same same scenario.How to create new records in the ES index(Existing one) by pulling out newly inserted records from database table.I tried with scheduler but scheduler is pulling data from scratch again.suggest me some procedure to achieve this functionality.

@kingster
Copy link

Hi phani546

Refer to the example #154

Using this method we are pulling only incremental data.

ES Version 1.1 Defn

curl -XPUT 'localhost:9200/_river/myRiver/_meta' -d '{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://......",
        "user" : "root",
        "password" : "...",
        "max_retries" : 3,
        "autocommit" : true,
        "sql" : [
                  {
                      "statement" : "INSERT INTO es.ES_TRACK (name, checkpoint , version  ) SELECT  ? ,  max(`lastUpdatedTS`), ?  from `members`",
                      "parameter" : [ "myRiver", "$job" ],
                      "callable" : false
                  },
                  {
                      "statement" : "SELECT members.id as \"_id\", members.*  from members where  members.lastUpdatedTS  < ( SELECT `checkpoint` from es.ES_TRACK WHERE `name`=  ? AND ack = 0 ORDER by id DESC LIMIT 1 )  AND members.lastUpdatedTS >= ( SELECT `checkpoint` FROM es.ES_TRACK WHERE `name`=  ? AND ack = 1 ORDER by id DESC LIMIT 1 ) ; "  ,
                      "parameter" : [  "myRiver"  ,  "myRiver" ],
                      "callable" : false
                  },{
                      "statement" : "UPDATE  es.ES_TRACK set ack = 1  where name = ? ORDER by id DESC LIMIT 1 ",
                        "parameter" : [ "myRiver"  ],
                        "callable" : false
                  }
        ],

        "versioning" : false,
        "schedule" : "0 */5 0-23 ? * *",
        "strategy" : "simple" ,

        "index" : "index",
        "type"  : "people"
    }
}'

@phani546
Copy link

Hi kingster,

Thank you for quick reply.I understood above script.can you explain me the second query.(members).is it user defined table or we need to create that member table?

Here you mentioned schedule key word to update index in specified time.but with out schedule how can we update data to es index manually(with out scheduler) when ever we want?

Thanks
phani

@kingster
Copy link

Hi

Members is my data table, which I import into es.

Change

        "strategy" : "simple" ,

to

        "schedule" : "0 */1 0-23 ? * *",
        "strategy" : "oneshot" ,

to run it manually once. It will run once on the next minute.

@netspruce
Copy link

Hello kingster,

I tested this on MySQL and it worked.
In PostgreSQL, I'm getting the following error.
Any pointers?

psql (9.1.13) default_transaction_read_only=off [2014-07-24 16:28:00,029][ERROR][Feeder ] error while getting next input: org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction java.io.IOException: org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.fetch(SimpleRiverSource.java:248) at org.xbib.elasticsearch.plugin.feeder.jdbc.JDBCFeeder.executeTask(JDBCFeeder.java:159) at org.xbib.elasticsearch.plugin.feeder.AbstractFeeder.newRequest(AbstractFeeder.java:361) at org.xbib.elasticsearch.plugin.feeder.AbstractFeeder.newRequest(AbstractFeeder.java:52) at org.xbib.pipeline.AbstractPipeline.call(AbstractPipeline.java:86) at org.xbib.pipeline.AbstractPipeline.call(AbstractPipeline.java:17) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2198) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1927) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255) at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:561) at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:419) at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:304) at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.executeQuery(SimpleRiverSource.java:515) at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.executeWithParameter(SimpleRiverSource.java:311) at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.fetch(SimpleRiverSource.java:238)

@kingster
Copy link

This looks like your sql statement is not being treated as update/insert statement.

Try debugging it against https://github.com/jprante/elasticsearch-river-jdbc/blob/master/src/main/java/org/xbib/elasticsearch/plugin/jdbc/SQLCommand.java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants