Skip to content

Commit

Permalink
added zero date check and date parser (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
spid37 authored and siddontang committed Sep 8, 2018
1 parent d9d0404 commit d238419
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
43 changes: 30 additions & 13 deletions river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

var myAddr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr")
var esAddr = flag.String("es_addr", "127.0.0.1:9200", "Elasticsearch addr")
var dateTimeStr = time.Now().Format(mysql.TimeFormat)
var dateStr = time.Now().Format(mysqlDateFormat)

func Test(t *testing.T) {
TestingT(t)
Expand All @@ -36,17 +38,18 @@ func (s *riverTestSuite) SetUpSuite(c *C) {

schema := `
CREATE TABLE IF NOT EXISTS %s (
id INT,
title VARCHAR(256),
content VARCHAR(256),
mylist VARCHAR(256),
mydate INT(10),
tenum ENUM("e1", "e2", "e3"),
tset SET("a", "b", "c"),
tbit BIT(1) default 1,
tdatetime DATETIME DEFAULT NULL,
ip INT UNSIGNED DEFAULT 0,
PRIMARY KEY(id)) ENGINE=INNODB;
id INT,
title VARCHAR(256),
content VARCHAR(256),
mylist VARCHAR(256),
mydate INT(10),
tenum ENUM("e1", "e2", "e3"),
tset SET("a", "b", "c"),
tbit BIT(1) default 1,
tdatetime DATETIME DEFAULT NULL,
tdate DATE DEFAULT NULL,
ip INT UNSIGNED DEFAULT 0,
PRIMARY KEY(id)) ENGINE=INNODB;
`

schemaJSON := `
Expand Down Expand Up @@ -223,8 +226,10 @@ func (s *riverTestSuite) testPrepareData(c *C) {
s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c")
}

datetime := time.Now().Format(mysql.TimeFormat)
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime, mydate) VALUES (?, ?, ?, ?, ?, ?, ?)", 16, "test datetime", "hello go 16", "e1", "a,b", datetime, 1458131094)
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime, mydate, tdate) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", 16, "test datetime", "hello go 16", "e1", "a,b", dateTimeStr, 1458131094, dateStr)

s.testExecute(c, "SET sql_mode = '';") // clear sql_mode to allow empty dates
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime, mydate, tdate) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", 20, "test empty datetime", "date test 20", "e1", "a,b", "0000-00-00 00:00:00", 0, "0000-00-00")

// test ip
s.testExecute(c, "INSERT test_river (id, ip) VALUES (?, ?)", 17, 0)
Expand All @@ -248,6 +253,7 @@ func (s *riverTestSuite) testElasticMapping(c *C) *elastic.MappingResponse {
c.Assert(err, IsNil)

c.Assert(r.Mapping[index].Mappings[docType].Properties["tdatetime"].Type, Equals, "date")
c.Assert(r.Mapping[index].Mappings[docType].Properties["tdate"].Type, Equals, "date")
c.Assert(r.Mapping[index].Mappings[docType].Properties["mydate"].Type, Equals, "date")
return r
}
Expand Down Expand Up @@ -359,6 +365,17 @@ func (s *riverTestSuite) TestRiver(c *C) {
c.Assert(r.Source["es_title"], Equals, "hello")
}

r = s.testElasticGet(c, "16")
c.Assert(r.Found, IsTrue)
tdt, _ := time.Parse(time.RFC3339, r.Source["tdatetime"].(string))
c.Assert(tdt.Format(mysql.TimeFormat), Equals, dateTimeStr)
c.Assert(r.Source["tdate"], Equals, dateStr)

r = s.testElasticGet(c, "20")
c.Assert(r.Found, IsTrue)
c.Assert(r.Source["tdate"], Equals, nil)
c.Assert(r.Source["tdatetime"], Equals, nil)

// test ip
r = s.testElasticGet(c, "17")
c.Assert(r.Found, IsTrue)
Expand Down
16 changes: 15 additions & 1 deletion river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
fieldTypeDate = "date"
)

const mysqlDateFormat = "2006-01-02"

type posSaver struct {
pos mysql.Position
force bool
Expand Down Expand Up @@ -333,9 +335,21 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in
case schema.TYPE_DATETIME, schema.TYPE_TIMESTAMP:
switch v := value.(type) {
case string:
vt, _ := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local)
vt, err := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local)
if err != nil || vt.IsZero() { // failed to parse date or zero date
return nil
}
return vt.Format(time.RFC3339)
}
case schema.TYPE_DATE:
switch v := value.(type) {
case string:
vt, err := time.Parse(mysqlDateFormat, string(v))
if err != nil || vt.IsZero() { // failed to parse date or zero date
return nil
}
return vt.Format(mysqlDateFormat)
}
}

return value
Expand Down

0 comments on commit d238419

Please sign in to comment.