diff --git a/.travis.yml b/.travis.yml index 8c50bb56b..4edb24862 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,21 +10,11 @@ env: addons: apt: packages: - - libonig-dev - libmysqlclient-dev matrix: fast_finish: true -sudo: required - -services: - - docker - -install: - - go get ./... - - make dependencies - before_script: - sudo service mysql stop @@ -45,8 +35,6 @@ jobs: python: '3.6' before_install: - eval "$(gimme 1.12.4)" - install: - - go get ./... script: - make TEST=python-pymysql integration @@ -54,8 +42,6 @@ jobs: php: '7.1' before_install: - eval "$(gimme 1.12.4)" - install: - - go get ./... script: - make TEST=php integration @@ -63,8 +49,6 @@ jobs: ruby: '2.3' before_install: - eval "$(gimme 1.12.4)" - install: - - go get ./... script: - make TEST=ruby integration @@ -72,8 +56,6 @@ jobs: jdk: openjdk8 before_install: - eval "$(gimme 1.12.4)" - install: - - go get ./... script: - make TEST=jdbc-mariadb integration @@ -81,8 +63,6 @@ jobs: node_js: '7' before_install: - eval "$(gimme 1.12.4)" - install: - - go get ./... script: - make TEST=javascript integration @@ -91,8 +71,6 @@ jobs: dotnet: '2.1' before_install: - eval "$(gimme 1.12.4)" - install: - - go get ./... script: - make TEST=dotnet integration @@ -100,7 +78,5 @@ jobs: compiler: clang before_install: - eval "$(gimme 1.12.4)" - install: - - go get ./... script: - make TEST=c integration diff --git a/README.md b/README.md index 2429f9fa7..19bc39e99 100644 --- a/README.md +++ b/README.md @@ -141,8 +141,7 @@ SET = |:-----|:-----|:------------| |`INMEMORY_JOINS`|environment|If set it will perform all joins in memory. Default is off.| |`inmemory_joins`|session|If set it will perform all joins in memory. Default is off. This has precedence over `INMEMORY_JOINS`.| -|`MAX_MEMORY_JOIN`|environment|The maximum number of memory, in megabytes, that can be consumed by go-mysql-server before switching to multipass mode in joins. Default is the 20% of all available physical memory.| -|`max_memory_joins`|session|The maximum number of memory, in megabytes, that can be consumed by go-mysql-server before switching to multipass mode in joins. Default is the 20% of all available physical memory. This has precedence over `MAX_MEMORY_JOIN`.| +|`MAX_MEMORY`|environment|The maximum number of memory, in megabytes, that can be consumed by go-mysql-server. Any in-memory caches or computations will no longer try to use memory when the limit is reached. Note that this may cause certain queries to fail if there is not enough memory available, such as queries using DISTINCT, ORDER BY or GROUP BY with groupings.| |`DEBUG_ANALYZER`|environment|If set, the analyzer will print debug messages. Default is off.| |`PILOSA_INDEX_THREADS`|environment|Number of threads used in index creation. Default is the number of cores available in the machine.| |`pilosa_index_threads`|environment|Number of threads used in index creation. Default is the number of cores available in the machine. This has precedence over `PILOSA_INDEX_THREADS`.| @@ -176,14 +175,14 @@ func main() { s.Start() } -func createTestDatabase() *mem.Database { +func createTestDatabase() *memory.Database { const ( dbName = "test" tableName = "mytable" ) - db := mem.NewDatabase(dbName) - table := mem.NewTable(tableName, sql.Schema{ + db := memory.NewDatabase(dbName) + table := memory.NewTable(tableName, sql.Schema{ {Name: "name", Type: sql.Text, Nullable: false, Source: tableName}, {Name: "email", Type: sql.Text, Nullable: false, Source: tableName}, {Name: "phone_numbers", Type: sql.JSON, Nullable: false, Source: tableName}, diff --git a/_example/main.go b/_example/main.go index bb52df03d..55a007247 100644 --- a/_example/main.go +++ b/_example/main.go @@ -5,7 +5,7 @@ import ( sqle "github.com/src-d/go-mysql-server" "github.com/src-d/go-mysql-server/auth" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/server" "github.com/src-d/go-mysql-server/sql" ) @@ -42,14 +42,14 @@ func main() { s.Start() } -func createTestDatabase() *mem.Database { +func createTestDatabase() *memory.Database { const ( dbName = "mydb" tableName = "mytable" ) - db := mem.NewDatabase(dbName) - table := mem.NewTable(tableName, sql.Schema{ + db := memory.NewDatabase(dbName) + table := memory.NewTable(tableName, sql.Schema{ {Name: "name", Type: sql.Text, Nullable: false, Source: tableName}, {Name: "email", Type: sql.Text, Nullable: false, Source: tableName}, {Name: "phone_numbers", Type: sql.JSON, Nullable: false, Source: tableName}, diff --git a/_integration/ruby/Makefile b/_integration/ruby/Makefile index cd1fbe1d2..5a52ad20b 100644 --- a/_integration/ruby/Makefile +++ b/_integration/ruby/Makefile @@ -1,4 +1,5 @@ vendor/bundle: + gem install bundler --version=1.16.5 bundler install --path vendor/bundle dependencies: vendor/bundle @@ -6,4 +7,4 @@ dependencies: vendor/bundle test: dependencies bundler exec ruby mysql_test.rb -.PHONY: test \ No newline at end of file +.PHONY: test diff --git a/auth/common_test.go b/auth/common_test.go index 7b68f15bf..cca7b04f9 100644 --- a/auth/common_test.go +++ b/auth/common_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" sqle "github.com/src-d/go-mysql-server" "github.com/src-d/go-mysql-server/auth" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/server" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/analyzer" @@ -21,13 +21,13 @@ import ( const port = 3336 func authEngine(au auth.Auth) (string, *sqle.Engine, error) { - db := mem.NewDatabase("test") + db := memory.NewDatabase("test") catalog := sql.NewCatalog() catalog.AddDatabase(db) tblName := "test" - table := mem.NewTable(tblName, sql.Schema{ + table := memory.NewTable(tblName, sql.Schema{ {Name: "id", Type: sql.Text, Nullable: false, Source: tblName}, {Name: "name", Type: sql.Text, Nullable: false, Source: tblName}, }) diff --git a/benchmark/tpc_h_test.go b/benchmark/tpc_h_test.go index 3afc44888..5524069fa 100644 --- a/benchmark/tpc_h_test.go +++ b/benchmark/tpc_h_test.go @@ -10,9 +10,9 @@ import ( "path/filepath" "testing" - "github.com/src-d/go-mysql-server" + sqle "github.com/src-d/go-mysql-server" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) @@ -83,11 +83,11 @@ func executeQueries(b *testing.B, e *sqle.Engine) error { } func genDB(b *testing.B) (sql.Database, error) { - db := mem.NewDatabase("tpch") + db := memory.NewDatabase("tpch") for _, m := range tpchTableMetadata { b.Log("generating table", m.name) - t := mem.NewTable(m.name, m.schema) + t := memory.NewTable(m.name, m.schema) if err := insertDataToTable(m.name, t, len(m.schema)); err != nil { return nil, err } @@ -98,7 +98,7 @@ func genDB(b *testing.B) (sql.Database, error) { return db, nil } -func insertDataToTable(name string, t *mem.Table, columnCount int) error { +func insertDataToTable(name string, t *memory.Table, columnCount int) error { f, err := os.Open(name + ".tbl") if err != nil { return err diff --git a/engine_test.go b/engine_test.go index b3381d03a..82e7fc65f 100644 --- a/engine_test.go +++ b/engine_test.go @@ -13,7 +13,7 @@ import ( sqle "github.com/src-d/go-mysql-server" "github.com/src-d/go-mysql-server/auth" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/analyzer" "github.com/src-d/go-mysql-server/sql/index/pilosa" @@ -1775,7 +1775,7 @@ const testNumPartitions = 5 func TestAmbiguousColumnResolution(t *testing.T) { require := require.New(t) - table := mem.NewPartitionedTable("foo", sql.Schema{ + table := memory.NewPartitionedTable("foo", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "foo"}, {Name: "b", Type: sql.Text, Source: "foo"}, }, testNumPartitions) @@ -1787,7 +1787,7 @@ func TestAmbiguousColumnResolution(t *testing.T) { sql.NewRow(int64(3), "baz"), ) - table2 := mem.NewPartitionedTable("bar", sql.Schema{ + table2 := memory.NewPartitionedTable("bar", sql.Schema{ {Name: "b", Type: sql.Text, Source: "bar"}, {Name: "c", Type: sql.Int64, Source: "bar"}, }, testNumPartitions) @@ -1798,7 +1798,7 @@ func TestAmbiguousColumnResolution(t *testing.T) { sql.NewRow("pux", int64(1)), ) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("foo", table) db.AddTable("bar", table2) @@ -1865,7 +1865,7 @@ func TestDDL(t *testing.T) { func TestNaturalJoin(t *testing.T) { require := require.New(t) - t1 := mem.NewPartitionedTable("t1", sql.Schema{ + t1 := memory.NewPartitionedTable("t1", sql.Schema{ {Name: "a", Type: sql.Text, Source: "t1"}, {Name: "b", Type: sql.Text, Source: "t1"}, {Name: "c", Type: sql.Text, Source: "t1"}, @@ -1878,7 +1878,7 @@ func TestNaturalJoin(t *testing.T) { sql.NewRow("a_3", "b_3", "c_3"), ) - t2 := mem.NewPartitionedTable("t2", sql.Schema{ + t2 := memory.NewPartitionedTable("t2", sql.Schema{ {Name: "a", Type: sql.Text, Source: "t2"}, {Name: "b", Type: sql.Text, Source: "t2"}, {Name: "d", Type: sql.Text, Source: "t2"}, @@ -1891,7 +1891,7 @@ func TestNaturalJoin(t *testing.T) { sql.NewRow("a_3", "b_3", "d_3"), ) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("t1", t1) db.AddTable("t2", t2) @@ -1917,7 +1917,7 @@ func TestNaturalJoin(t *testing.T) { func TestNaturalJoinEqual(t *testing.T) { require := require.New(t) - t1 := mem.NewPartitionedTable("t1", sql.Schema{ + t1 := memory.NewPartitionedTable("t1", sql.Schema{ {Name: "a", Type: sql.Text, Source: "t1"}, {Name: "b", Type: sql.Text, Source: "t1"}, {Name: "c", Type: sql.Text, Source: "t1"}, @@ -1930,7 +1930,7 @@ func TestNaturalJoinEqual(t *testing.T) { sql.NewRow("a_3", "b_3", "c_3"), ) - t2 := mem.NewPartitionedTable("t2", sql.Schema{ + t2 := memory.NewPartitionedTable("t2", sql.Schema{ {Name: "a", Type: sql.Text, Source: "t2"}, {Name: "b", Type: sql.Text, Source: "t2"}, {Name: "c", Type: sql.Text, Source: "t2"}, @@ -1943,7 +1943,7 @@ func TestNaturalJoinEqual(t *testing.T) { sql.NewRow("a_3", "b_3", "c_3"), ) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("t1", t1) db.AddTable("t2", t2) @@ -1969,7 +1969,7 @@ func TestNaturalJoinEqual(t *testing.T) { func TestNaturalJoinDisjoint(t *testing.T) { require := require.New(t) - t1 := mem.NewPartitionedTable("t1", sql.Schema{ + t1 := memory.NewPartitionedTable("t1", sql.Schema{ {Name: "a", Type: sql.Text, Source: "t1"}, }, testNumPartitions) @@ -1980,7 +1980,7 @@ func TestNaturalJoinDisjoint(t *testing.T) { sql.NewRow("a3"), ) - t2 := mem.NewPartitionedTable("t2", sql.Schema{ + t2 := memory.NewPartitionedTable("t2", sql.Schema{ {Name: "b", Type: sql.Text, Source: "t2"}, }, testNumPartitions) insertRows( @@ -1990,7 +1990,7 @@ func TestNaturalJoinDisjoint(t *testing.T) { sql.NewRow("b3"), ) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("t1", t1) db.AddTable("t2", t2) @@ -2022,7 +2022,7 @@ func TestNaturalJoinDisjoint(t *testing.T) { func TestInnerNestedInNaturalJoins(t *testing.T) { require := require.New(t) - table1 := mem.NewPartitionedTable("table1", sql.Schema{ + table1 := memory.NewPartitionedTable("table1", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "table1"}, {Name: "f", Type: sql.Float64, Source: "table1"}, {Name: "t", Type: sql.Text, Source: "table1"}, @@ -2035,7 +2035,7 @@ func TestInnerNestedInNaturalJoins(t *testing.T) { sql.NewRow(int32(10), float64(2.1), "table1"), ) - table2 := mem.NewPartitionedTable("table2", sql.Schema{ + table2 := memory.NewPartitionedTable("table2", sql.Schema{ {Name: "i2", Type: sql.Int32, Source: "table2"}, {Name: "f2", Type: sql.Float64, Source: "table2"}, {Name: "t2", Type: sql.Text, Source: "table2"}, @@ -2048,7 +2048,7 @@ func TestInnerNestedInNaturalJoins(t *testing.T) { sql.NewRow(int32(20), float64(2.2), "table2"), ) - table3 := mem.NewPartitionedTable("table3", sql.Schema{ + table3 := memory.NewPartitionedTable("table3", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "table3"}, {Name: "f2", Type: sql.Float64, Source: "table3"}, {Name: "t3", Type: sql.Text, Source: "table3"}, @@ -2061,7 +2061,7 @@ func TestInnerNestedInNaturalJoins(t *testing.T) { sql.NewRow(int32(30), float64(2.2), "table3"), ) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("table1", table1) db.AddTable("table2", table2) db.AddTable("table3", table3) @@ -2114,7 +2114,7 @@ func newEngine(t *testing.T) *sqle.Engine { } func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { - table := mem.NewPartitionedTable("mytable", sql.Schema{ + table := memory.NewPartitionedTable("mytable", sql.Schema{ {Name: "i", Type: sql.Int64, Source: "mytable"}, {Name: "s", Type: sql.Text, Source: "mytable"}, }, testNumPartitions) @@ -2126,7 +2126,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { sql.NewRow(int64(3), "third row"), ) - table2 := mem.NewPartitionedTable("othertable", sql.Schema{ + table2 := memory.NewPartitionedTable("othertable", sql.Schema{ {Name: "s2", Type: sql.Text, Source: "othertable"}, {Name: "i2", Type: sql.Int64, Source: "othertable"}, }, testNumPartitions) @@ -2138,7 +2138,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { sql.NewRow("third", int64(1)), ) - table3 := mem.NewPartitionedTable("tabletest", sql.Schema{ + table3 := memory.NewPartitionedTable("tabletest", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "tabletest"}, {Name: "s", Type: sql.Text, Source: "tabletest"}, }, testNumPartitions) @@ -2150,7 +2150,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { sql.NewRow(int64(3), "third row"), ) - table4 := mem.NewPartitionedTable("other_table", sql.Schema{ + table4 := memory.NewPartitionedTable("other_table", sql.Schema{ {Name: "text", Type: sql.Text, Source: "tabletest"}, {Name: "number", Type: sql.Int32, Source: "tabletest"}, }, testNumPartitions) @@ -2162,7 +2162,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { sql.NewRow("c", int32(0)), ) - bigtable := mem.NewPartitionedTable("bigtable", sql.Schema{ + bigtable := memory.NewPartitionedTable("bigtable", sql.Schema{ {Name: "t", Type: sql.Text, Source: "bigtable"}, {Name: "n", Type: sql.Int64, Source: "bigtable"}, }, testNumPartitions) @@ -2185,7 +2185,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { sql.NewRow("b", int64(9)), ) - floatTable := mem.NewPartitionedTable("floattable", sql.Schema{ + floatTable := memory.NewPartitionedTable("floattable", sql.Schema{ {Name: "i", Type: sql.Int64, Source: "floattable"}, {Name: "f32", Type: sql.Float32, Source: "floattable"}, {Name: "f64", Type: sql.Float64, Source: "floattable"}, @@ -2201,7 +2201,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { sql.NewRow(-2, float32(-1.5), float64(-1.5)), ) - nilTable := mem.NewPartitionedTable("niltable", sql.Schema{ + nilTable := memory.NewPartitionedTable("niltable", sql.Schema{ {Name: "i", Type: sql.Int64, Source: "niltable", Nullable: true}, {Name: "b", Type: sql.Boolean, Source: "niltable", Nullable: true}, {Name: "f", Type: sql.Float64, Source: "niltable", Nullable: true}, @@ -2216,7 +2216,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { sql.NewRow(nil, nil, nil), ) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("mytable", table) db.AddTable("othertable", table2) db.AddTable("tabletest", table3) @@ -2224,7 +2224,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine { db.AddTable("floattable", floatTable) db.AddTable("niltable", nilTable) - db2 := mem.NewDatabase("foo") + db2 := memory.NewDatabase("foo") db2.AddTable("other_table", table4) catalog := sql.NewCatalog() @@ -2488,7 +2488,7 @@ func TestCreateIndex(t *testing.T) { func TestOrderByGroupBy(t *testing.T) { require := require.New(t) - table := mem.NewPartitionedTable("members", sql.Schema{ + table := memory.NewPartitionedTable("members", sql.Schema{ {Name: "id", Type: sql.Int64, Source: "members"}, {Name: "team", Type: sql.Text, Source: "members"}, }, testNumPartitions) @@ -2503,7 +2503,7 @@ func TestOrderByGroupBy(t *testing.T) { sql.NewRow(int64(8), "purple"), ) - db := mem.NewDatabase("db") + db := memory.NewDatabase("db") db.AddTable("members", table) e := sqle.NewDefault() @@ -2583,12 +2583,12 @@ func TestTracing(t *testing.T) { func TestReadOnly(t *testing.T) { require := require.New(t) - table := mem.NewPartitionedTable("mytable", sql.Schema{ + table := memory.NewPartitionedTable("mytable", sql.Schema{ {Name: "i", Type: sql.Int64, Source: "mytable"}, {Name: "s", Type: sql.Text, Source: "mytable"}, }, testNumPartitions) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("mytable", table) catalog := sql.NewCatalog() @@ -2689,11 +2689,11 @@ func TestUse(t *testing.T) { func TestLocks(t *testing.T) { require := require.New(t) - t1 := newLockableTable(mem.NewTable("t1", nil)) - t2 := newLockableTable(mem.NewTable("t2", nil)) - t3 := mem.NewTable("t3", nil) + t1 := newLockableTable(memory.NewTable("t1", nil)) + t2 := newLockableTable(memory.NewTable("t2", nil)) + t3 := memory.NewTable("t3", nil) catalog := sql.NewCatalog() - db := mem.NewDatabase("db") + db := memory.NewDatabase("db") db.AddTable("t1", t1) db.AddTable("t2", t2) db.AddTable("t3", t3) @@ -2789,7 +2789,7 @@ var generatorQueries = []struct { } func TestGenerators(t *testing.T) { - table := mem.NewPartitionedTable("t", sql.Schema{ + table := memory.NewPartitionedTable("t", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t"}, {Name: "b", Type: sql.Array(sql.Text), Source: "t"}, {Name: "c", Type: sql.Text, Source: "t"}, @@ -2802,7 +2802,7 @@ func TestGenerators(t *testing.T) { sql.NewRow(int64(3), []interface{}{"e", "f"}, "third"), ) - db := mem.NewDatabase("db") + db := memory.NewDatabase("db") db.AddTable("t", table) catalog := sql.NewCatalog() diff --git a/example_test.go b/example_test.go index a5e877257..eddcae535 100644 --- a/example_test.go +++ b/example_test.go @@ -5,7 +5,7 @@ import ( "io" "github.com/src-d/go-mysql-server" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) @@ -45,8 +45,8 @@ func checkIfError(err error) { } func createTestDatabase() sql.Database { - db := mem.NewDatabase("test") - table := mem.NewTable("mytable", sql.Schema{ + db := memory.NewDatabase("test") + table := memory.NewTable("mytable", sql.Schema{ {Name: "name", Type: sql.Text, Source: "mytable"}, {Name: "email", Type: sql.Text, Source: "mytable"}, }) diff --git a/go.mod b/go.mod index d409bd6b2..0611e96d4 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.3.0 // indirect github.com/gorilla/handlers v1.4.0 // indirect + github.com/hashicorp/golang-lru v0.5.3 github.com/mitchellh/hashstructure v1.0.0 github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 github.com/opentracing/opentracing-go v1.0.2 diff --git a/go.sum b/go.sum index 5632ba0b5..baea2003e 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,6 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk= github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= @@ -67,6 +65,8 @@ github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCS github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= +github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG676r31M= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= diff --git a/mem/database.go b/memory/database.go similarity index 93% rename from mem/database.go rename to memory/database.go index e42e2fa28..b5b87a741 100644 --- a/mem/database.go +++ b/memory/database.go @@ -1,4 +1,4 @@ -package mem // import "github.com/src-d/go-mysql-server/mem" +package memory import ( "github.com/src-d/go-mysql-server/sql" diff --git a/mem/database_test.go b/memory/database_test.go similarity index 97% rename from mem/database_test.go rename to memory/database_test.go index 62669214e..ce7dca623 100644 --- a/mem/database_test.go +++ b/memory/database_test.go @@ -1,4 +1,4 @@ -package mem +package memory import ( "testing" diff --git a/mem/table.go b/memory/table.go similarity index 99% rename from mem/table.go rename to memory/table.go index d12a1e316..b6420af86 100644 --- a/mem/table.go +++ b/memory/table.go @@ -1,4 +1,4 @@ -package mem +package memory import ( "bytes" diff --git a/mem/table_test.go b/memory/table_test.go similarity index 99% rename from mem/table_test.go rename to memory/table_test.go index 825761502..be0bea3e6 100644 --- a/mem/table_test.go +++ b/memory/table_test.go @@ -1,4 +1,4 @@ -package mem +package memory import ( "fmt" diff --git a/server/context.go b/server/context.go index 974f32a85..69edd83e9 100644 --- a/server/context.go +++ b/server/context.go @@ -28,6 +28,7 @@ func DefaultSessionBuilder(c *mysql.Conn, addr string) sql.Session { type SessionManager struct { addr string tracer opentracing.Tracer + memory *sql.MemoryManager mu *sync.Mutex builder SessionBuilder sessions map[uint32]sql.Session @@ -38,11 +39,13 @@ type SessionManager struct { func NewSessionManager( builder SessionBuilder, tracer opentracing.Tracer, + memory *sql.MemoryManager, addr string, ) *SessionManager { return &SessionManager{ addr: addr, tracer: tracer, + memory: memory, mu: new(sync.Mutex), builder: builder, sessions: make(map[uint32]sql.Session), @@ -97,6 +100,7 @@ func (s *SessionManager) NewContextWithQuery( sql.WithTracer(s.tracer), sql.WithPid(s.nextPid()), sql.WithQuery(query), + sql.WithMemoryManager(s.memory), ) return context diff --git a/server/handler_test.go b/server/handler_test.go index 94cf47af3..b8ec17ff2 100644 --- a/server/handler_test.go +++ b/server/handler_test.go @@ -8,7 +8,7 @@ import ( "unsafe" sqle "github.com/src-d/go-mysql-server" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -20,10 +20,10 @@ import ( func setupMemDB(require *require.Assertions) *sqle.Engine { e := sqle.NewDefault() - db := mem.NewDatabase("test") + db := memory.NewDatabase("test") e.AddDatabase(db) - tableTest := mem.NewTable("test", sql.Schema{{Name: "c1", Type: sql.Int32, Source: "test"}}) + tableTest := memory.NewTable("test", sql.Schema{{Name: "c1", Type: sql.Int32, Source: "test"}}) for i := 0; i < 1010; i++ { require.NoError(tableTest.Insert( @@ -47,7 +47,15 @@ func TestHandlerOutput(t *testing.T) { e := setupMemDB(require.New(t)) dummyConn := &mysql.Conn{ConnectionID: 1} - handler := NewHandler(e, NewSessionManager(testSessionBuilder, opentracing.NoopTracer{}, "foo")) + handler := NewHandler( + e, + NewSessionManager( + testSessionBuilder, + opentracing.NoopTracer{}, + sql.NewMemoryManager(nil), + "foo", + ), + ) handler.NewConnection(dummyConn) type exptectedValues struct { @@ -172,6 +180,7 @@ func TestHandlerKill(t *testing.T) { return sql.NewSession(addr, "", "", conn.ConnectionID) }, opentracing.NoopTracer{}, + sql.NewMemoryManager(nil), "foo", ), ) diff --git a/server/server.go b/server/server.go index dcb6206c5..236a4c062 100644 --- a/server/server.go +++ b/server/server.go @@ -4,7 +4,7 @@ import ( "time" opentracing "github.com/opentracing/opentracing-go" - "github.com/src-d/go-mysql-server" + sqle "github.com/src-d/go-mysql-server" "github.com/src-d/go-mysql-server/auth" "vitess.io/vitess/go/mysql" @@ -54,7 +54,10 @@ func NewServer(cfg Config, e *sqle.Engine, sb SessionBuilder) (*Server, error) { cfg.ConnWriteTimeout = 0 } - handler := NewHandler(e, NewSessionManager(sb, tracer, cfg.Address)) + handler := NewHandler( + e, + NewSessionManager(sb, tracer, e.Catalog.MemoryManager, cfg.Address), + ) a := cfg.Auth.Mysql() l, err := mysql.NewListener(cfg.Protocol, cfg.Address, a, handler, cfg.ConnReadTimeout, cfg.ConnWriteTimeout) if err != nil { diff --git a/sql/analyzer/aggregations_test.go b/sql/analyzer/aggregations_test.go index 0b64ff1da..a85533ee3 100644 --- a/sql/analyzer/aggregations_test.go +++ b/sql/analyzer/aggregations_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/expression/function/aggregation" @@ -14,7 +14,7 @@ import ( func TestReorderAggregations(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "foo"}, {Name: "b", Type: sql.Int64, Source: "foo"}, {Name: "c", Type: sql.Int64, Source: "foo"}, @@ -67,7 +67,7 @@ func TestReorderAggregations(t *testing.T) { func TestReorderAggregationsMultiple(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "foo"}, {Name: "b", Type: sql.Int64, Source: "foo"}, {Name: "c", Type: sql.Int64, Source: "foo"}, diff --git a/sql/analyzer/analyzer_test.go b/sql/analyzer/analyzer_test.go index 355b211ce..0d0564b57 100644 --- a/sql/analyzer/analyzer_test.go +++ b/sql/analyzer/analyzer_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -14,16 +14,16 @@ import ( func TestAnalyzer_Analyze(t *testing.T) { require := require.New(t) - table := mem.NewTable("mytable", sql.Schema{ + table := memory.NewTable("mytable", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "mytable"}, {Name: "t", Type: sql.Text, Source: "mytable"}, }) - table2 := mem.NewTable("mytable2", sql.Schema{ + table2 := memory.NewTable("mytable2", sql.Schema{ {Name: "i2", Type: sql.Int32, Source: "mytable2"}, }) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("mytable", table) db.AddTable("mytable2", table2) @@ -143,7 +143,7 @@ func TestAnalyzer_Analyze(t *testing.T) { expression.NewGetFieldWithTable(0, sql.Int32, "mytable", "i", false), expression.NewLiteral(int32(1), sql.Int32), ), - }).(*mem.Table).WithProjection([]string{"i"}), + }).(*memory.Table).WithProjection([]string{"i"}), ) require.NoError(err) require.Equal(expected, analyzed) @@ -187,11 +187,11 @@ func TestAnalyzer_Analyze(t *testing.T) { func TestMaxIterations(t *testing.T) { require := require.New(t) tName := "my-table" - table := mem.NewTable(tName, sql.Schema{ + table := memory.NewTable(tName, sql.Schema{ {Name: "i", Type: sql.Int32, Source: tName}, {Name: "t", Type: sql.Text, Source: tName}, }) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable(tName, table) catalog := sql.NewCatalog() @@ -205,7 +205,7 @@ func TestMaxIterations(t *testing.T) { case *plan.ResolvedTable: count++ name := fmt.Sprintf("mytable-%v", count) - table := mem.NewTable(name, sql.Schema{ + table := memory.NewTable(name, sql.Schema{ {Name: "i", Type: sql.Int32, Source: name}, {Name: "t", Type: sql.Text, Source: name}, }) @@ -220,7 +220,7 @@ func TestMaxIterations(t *testing.T) { require.NoError(err) require.Equal( plan.NewResolvedTable( - mem.NewTable("mytable-1000", sql.Schema{ + memory.NewTable("mytable-1000", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "mytable-1000"}, {Name: "t", Type: sql.Text, Source: "mytable-1000"}, }), @@ -272,25 +272,25 @@ func countRules(batches []*Batch) int { func TestMixInnerAndNaturalJoins(t *testing.T) { var require = require.New(t) - table := mem.NewTable("mytable", sql.Schema{ + table := memory.NewTable("mytable", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "mytable"}, {Name: "f", Type: sql.Float64, Source: "mytable"}, {Name: "t", Type: sql.Text, Source: "mytable"}, }) - table2 := mem.NewTable("mytable2", sql.Schema{ + table2 := memory.NewTable("mytable2", sql.Schema{ {Name: "i2", Type: sql.Int32, Source: "mytable2"}, {Name: "f2", Type: sql.Float64, Source: "mytable2"}, {Name: "t2", Type: sql.Text, Source: "mytable2"}, }) - table3 := mem.NewTable("mytable3", sql.Schema{ + table3 := memory.NewTable("mytable3", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "mytable3"}, {Name: "f2", Type: sql.Float64, Source: "mytable3"}, {Name: "t3", Type: sql.Text, Source: "mytable3"}, }) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("mytable", table) db.AddTable("mytable2", table2) db.AddTable("mytable3", table3) @@ -400,25 +400,25 @@ func TestReorderProjectionUnresolvedChild(t *testing.T) { ), ) - commits := mem.NewTable("commits", sql.Schema{ + commits := memory.NewTable("commits", sql.Schema{ {Name: "repository_id", Source: "commits", Type: sql.Text}, {Name: "commit_hash", Source: "commits", Type: sql.Text}, {Name: "commit_author_when", Source: "commits", Type: sql.Text}, }) - refs := mem.NewTable("refs", sql.Schema{ + refs := memory.NewTable("refs", sql.Schema{ {Name: "repository_id", Source: "refs", Type: sql.Text}, {Name: "ref_name", Source: "refs", Type: sql.Text}, }) - refCommits := mem.NewTable("ref_commits", sql.Schema{ + refCommits := memory.NewTable("ref_commits", sql.Schema{ {Name: "repository_id", Source: "ref_commits", Type: sql.Text}, {Name: "ref_name", Source: "ref_commits", Type: sql.Text}, {Name: "commit_hash", Source: "ref_commits", Type: sql.Text}, {Name: "history_index", Source: "ref_commits", Type: sql.Int64}, }) - db := mem.NewDatabase("") + db := memory.NewDatabase("") db.AddTable("refs", refs) db.AddTable("ref_commits", refCommits) db.AddTable("commits", commits) diff --git a/sql/analyzer/assign_catalog_test.go b/sql/analyzer/assign_catalog_test.go index 45d2b1bbb..133299455 100644 --- a/sql/analyzer/assign_catalog_test.go +++ b/sql/analyzer/assign_catalog_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/plan" ) @@ -13,14 +13,14 @@ func TestAssignCatalog(t *testing.T) { require := require.New(t) f := getRule("assign_catalog") - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") c := sql.NewCatalog() c.AddDatabase(db) a := NewDefault(c) a.Catalog.IndexRegistry = sql.NewIndexRegistry() - tbl := mem.NewTable("foo", nil) + tbl := memory.NewTable("foo", nil) node, err := f.Apply(sql.NewEmptyContext(), a, plan.NewCreateIndex("", plan.NewResolvedTable(tbl), nil, "", make(map[string]string))) diff --git a/sql/analyzer/assign_indexes_test.go b/sql/analyzer/assign_indexes_test.go index a50bd1ca5..26793ba95 100644 --- a/sql/analyzer/assign_indexes_test.go +++ b/sql/analyzer/assign_indexes_test.go @@ -6,7 +6,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -29,7 +29,7 @@ func TestNegateIndex(t *testing.T) { a := NewDefault(catalog) - t1 := mem.NewTable("t1", sql.Schema{ + t1 := memory.NewTable("t1", sql.Schema{ {Name: "foo", Type: sql.Int64, Source: "t1"}, }) @@ -86,11 +86,11 @@ func TestAssignIndexes(t *testing.T) { a := NewDefault(catalog) - t1 := mem.NewTable("t1", sql.Schema{ + t1 := memory.NewTable("t1", sql.Schema{ {Name: "foo", Type: sql.Int64, Source: "t1"}, }) - t2 := mem.NewTable("t2", sql.Schema{ + t2 := memory.NewTable("t2", sql.Schema{ {Name: "bar", Type: sql.Int64, Source: "t2"}, {Name: "baz", Type: sql.Int64, Source: "t2"}, }) @@ -861,7 +861,7 @@ func TestContainsSources(t *testing.T) { func TestNodeSources(t *testing.T) { sources := nodeSources( plan.NewResolvedTable( - mem.NewTable("foo", sql.Schema{ + memory.NewTable("foo", sql.Schema{ {Source: "foo"}, {Source: "foo"}, {Source: "bar"}, diff --git a/sql/analyzer/convert_dates_test.go b/sql/analyzer/convert_dates_test.go index 934ec5db3..72ba53868 100644 --- a/sql/analyzer/convert_dates_test.go +++ b/sql/analyzer/convert_dates_test.go @@ -3,7 +3,7 @@ package analyzer import ( "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/expression/function" @@ -158,7 +158,7 @@ func TestConvertDates(t *testing.T) { }, } - table := plan.NewResolvedTable(mem.NewTable("t", nil)) + table := plan.NewResolvedTable(memory.NewTable("t", nil)) for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { @@ -172,7 +172,7 @@ func TestConvertDates(t *testing.T) { } func TestConvertDatesProject(t *testing.T) { - table := plan.NewResolvedTable(mem.NewTable("t", nil)) + table := plan.NewResolvedTable(memory.NewTable("t", nil)) input := plan.NewFilter( expression.NewEquals( expression.NewGetField(0, sql.Int64, "foo", false), @@ -204,7 +204,7 @@ func TestConvertDatesProject(t *testing.T) { } func TestConvertDatesGroupBy(t *testing.T) { - table := plan.NewResolvedTable(mem.NewTable("t", nil)) + table := plan.NewResolvedTable(memory.NewTable("t", nil)) input := plan.NewFilter( expression.NewEquals( expression.NewGetField(0, sql.Int64, "foo", false), @@ -250,7 +250,7 @@ func TestConvertDatesGroupBy(t *testing.T) { } func TestConvertDatesFieldReference(t *testing.T) { - table := plan.NewResolvedTable(mem.NewTable("t", nil)) + table := plan.NewResolvedTable(memory.NewTable("t", nil)) input := plan.NewFilter( expression.NewEquals( expression.NewGetField(0, sql.Int64, "DAYOFWEEK(foo)", false), diff --git a/sql/analyzer/optimization_rules_test.go b/sql/analyzer/optimization_rules_test.go index 2ac9efca9..d036a6c32 100644 --- a/sql/analyzer/optimization_rules_test.go +++ b/sql/analyzer/optimization_rules_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -13,7 +13,7 @@ import ( func TestReorderProjection(t *testing.T) { f := getRule("reorder_projection") - table := mem.NewTable("mytable", sql.Schema{{ + table := memory.NewTable("mytable", sql.Schema{{ Name: "i", Source: "mytable", Type: sql.Int64, }}) @@ -137,7 +137,7 @@ func TestEraseProjection(t *testing.T) { require := require.New(t) f := getRule("erase_projection") - table := mem.NewTable("mytable", sql.Schema{{ + table := memory.NewTable("mytable", sql.Schema{{ Name: "i", Source: "mytable", Type: sql.Int64, }}) @@ -188,8 +188,8 @@ func TestEraseProjection(t *testing.T) { func TestOptimizeDistinct(t *testing.T) { require := require.New(t) - t1 := mem.NewTable("foo", nil) - t2 := mem.NewTable("foo", nil) + t1 := memory.NewTable("foo", nil) + t2 := memory.NewTable("foo", nil) notSorted := plan.NewDistinct(plan.NewResolvedTable(t1)) sorted := plan.NewDistinct(plan.NewSort(nil, plan.NewResolvedTable(t2))) @@ -207,17 +207,17 @@ func TestOptimizeDistinct(t *testing.T) { } func TestMoveJoinConditionsToFilter(t *testing.T) { - t1 := mem.NewTable("t1", sql.Schema{ + t1 := memory.NewTable("t1", sql.Schema{ {Name: "a", Source: "t1", Type: sql.Int64}, {Name: "b", Source: "t1", Type: sql.Int64}, }) - t2 := mem.NewTable("t2", sql.Schema{ + t2 := memory.NewTable("t2", sql.Schema{ {Name: "c", Source: "t2", Type: sql.Int64}, {Name: "d", Source: "t2", Type: sql.Int64}, }) - t3 := mem.NewTable("t3", sql.Schema{ + t3 := memory.NewTable("t3", sql.Schema{ {Name: "e", Source: "t3", Type: sql.Int64}, {Name: "f", Source: "t3", Type: sql.Int64}, }) @@ -295,7 +295,7 @@ func TestMoveJoinConditionsToFilter(t *testing.T) { } func TestEvalFilter(t *testing.T) { - inner := mem.NewTable("foo", nil) + inner := memory.NewTable("foo", nil) rule := getRule("eval_filter") testCases := []struct { @@ -428,7 +428,7 @@ func TestRemoveUnnecessaryConverts(t *testing.T) { node := plan.NewProject([]sql.Expression{ expression.NewConvert(tt.childExpr, tt.castType), }, - plan.NewResolvedTable(mem.NewTable("foo", nil)), + plan.NewResolvedTable(memory.NewTable("foo", nil)), ) result, err := removeUnnecessaryConverts( diff --git a/sql/analyzer/parallelize_test.go b/sql/analyzer/parallelize_test.go index 41282c03e..5f554a442 100644 --- a/sql/analyzer/parallelize_test.go +++ b/sql/analyzer/parallelize_test.go @@ -3,7 +3,7 @@ package analyzer import ( "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -12,7 +12,7 @@ import ( func TestParallelize(t *testing.T) { require := require.New(t) - table := mem.NewTable("t", nil) + table := memory.NewTable("t", nil) rule := getRuleFrom(OnceAfterAll, "parallelize") node := plan.NewProject( nil, @@ -57,7 +57,7 @@ func TestParallelize(t *testing.T) { func TestParallelizeCreateIndex(t *testing.T) { require := require.New(t) - table := mem.NewTable("t", nil) + table := memory.NewTable("t", nil) rule := getRuleFrom(OnceAfterAll, "parallelize") node := plan.NewCreateIndex( "", @@ -73,7 +73,7 @@ func TestParallelizeCreateIndex(t *testing.T) { } func TestIsParallelizable(t *testing.T) { - table := mem.NewTable("t", nil) + table := memory.NewTable("t", nil) testCases := []struct { name string @@ -172,7 +172,7 @@ func TestIsParallelizable(t *testing.T) { func TestRemoveRedundantExchanges(t *testing.T) { require := require.New(t) - table := mem.NewTable("t", nil) + table := memory.NewTable("t", nil) node := plan.NewProject( nil, diff --git a/sql/analyzer/process_test.go b/sql/analyzer/process_test.go index 0d9b25bcc..6426e01ab 100644 --- a/sql/analyzer/process_test.go +++ b/sql/analyzer/process_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -19,8 +19,8 @@ func TestTrackProcess(t *testing.T) { a := NewDefault(catalog) node := plan.NewInnerJoin( - plan.NewResolvedTable(&table{mem.NewPartitionedTable("foo", nil, 2)}), - plan.NewResolvedTable(mem.NewPartitionedTable("bar", nil, 4)), + plan.NewResolvedTable(&table{memory.NewPartitionedTable("foo", nil, 2)}), + plan.NewResolvedTable(memory.NewPartitionedTable("bar", nil, 4)), expression.NewLiteral(int64(1), sql.Int64), ) @@ -80,7 +80,7 @@ func TestTrackProcessSubquery(t *testing.T) { nil, plan.NewSubqueryAlias("f", plan.NewQueryProcess( - plan.NewResolvedTable(mem.NewTable("foo", nil)), + plan.NewResolvedTable(memory.NewTable("foo", nil)), nil, ), ), @@ -92,7 +92,7 @@ func TestTrackProcessSubquery(t *testing.T) { expectedChild := plan.NewProject( nil, plan.NewSubqueryAlias("f", - plan.NewResolvedTable(mem.NewTable("foo", nil)), + plan.NewResolvedTable(memory.NewTable("foo", nil)), ), ) diff --git a/sql/analyzer/prune_columns_test.go b/sql/analyzer/prune_columns_test.go index 848a47681..15724577a 100644 --- a/sql/analyzer/prune_columns_test.go +++ b/sql/analyzer/prune_columns_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -14,13 +14,13 @@ func TestPruneColumns(t *testing.T) { rule := getRuleFrom(OnceAfterDefault, "prune_columns") a := NewDefault(nil) - t1 := plan.NewResolvedTable(mem.NewTable("t1", sql.Schema{ + t1 := plan.NewResolvedTable(memory.NewTable("t1", sql.Schema{ {Name: "foo", Type: sql.Int64, Source: "t1"}, {Name: "bar", Type: sql.Int64, Source: "t1"}, {Name: "bax", Type: sql.Int64, Source: "t1"}, })) - t2 := plan.NewResolvedTable(mem.NewTable("t2", sql.Schema{ + t2 := plan.NewResolvedTable(memory.NewTable("t2", sql.Schema{ {Name: "foo", Type: sql.Int64, Source: "t2"}, {Name: "baz", Type: sql.Int64, Source: "t2"}, {Name: "bux", Type: sql.Int64, Source: "t2"}, diff --git a/sql/analyzer/pushdown_test.go b/sql/analyzer/pushdown_test.go index d0cb737ad..818eb40a0 100644 --- a/sql/analyzer/pushdown_test.go +++ b/sql/analyzer/pushdown_test.go @@ -3,7 +3,7 @@ package analyzer import ( "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -14,19 +14,19 @@ func TestPushdownProjectionAndFilters(t *testing.T) { require := require.New(t) f := getRule("pushdown") - table := mem.NewTable("mytable", sql.Schema{ + table := memory.NewTable("mytable", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "mytable"}, {Name: "f", Type: sql.Float64, Source: "mytable"}, {Name: "t", Type: sql.Text, Source: "mytable"}, }) - table2 := mem.NewTable("mytable2", sql.Schema{ + table2 := memory.NewTable("mytable2", sql.Schema{ {Name: "i2", Type: sql.Int32, Source: "mytable2"}, {Name: "f2", Type: sql.Float64, Source: "mytable2"}, {Name: "t2", Type: sql.Text, Source: "mytable2"}, }) - db := mem.NewDatabase("mydb") + db := memory.NewDatabase("mydb") db.AddTable("mytable", table) db.AddTable("mytable2", table2) @@ -66,14 +66,14 @@ func TestPushdownProjectionAndFilters(t *testing.T) { expression.NewGetFieldWithTable(1, sql.Float64, "mytable", "f", false), expression.NewLiteral(3.14, sql.Float64), ), - }).(*mem.Table).WithProjection([]string{"i", "f"}), + }).(*memory.Table).WithProjection([]string{"i", "f"}), ), plan.NewResolvedTable( table2.WithFilters([]sql.Expression{ expression.NewIsNull( expression.NewGetFieldWithTable(0, sql.Int32, "mytable2", "i2", false), ), - }).(*mem.Table).WithProjection([]string{"i2"}), + }).(*memory.Table).WithProjection([]string{"i2"}), ), ), ) @@ -86,19 +86,19 @@ func TestPushdownProjectionAndFilters(t *testing.T) { func TestPushdownIndexable(t *testing.T) { require := require.New(t) - table := mem.NewTable("mytable", sql.Schema{ + table := memory.NewTable("mytable", sql.Schema{ {Name: "i", Type: sql.Int32, Source: "mytable"}, {Name: "f", Type: sql.Float64, Source: "mytable"}, {Name: "t", Type: sql.Text, Source: "mytable"}, }) - table2 := mem.NewTable("mytable2", sql.Schema{ + table2 := memory.NewTable("mytable2", sql.Schema{ {Name: "i2", Type: sql.Int32, Source: "mytable2"}, {Name: "f2", Type: sql.Float64, Source: "mytable2"}, {Name: "t2", Type: sql.Text, Source: "mytable2"}, }) - db := mem.NewDatabase("") + db := memory.NewDatabase("") db.AddTable("mytable", table) db.AddTable("mytable2", table2) @@ -187,8 +187,8 @@ func TestPushdownIndexable(t *testing.T) { expression.NewGetFieldWithTable(0, sql.Int32, "mytable", "i", false), expression.NewLiteral(1, sql.Int32), ), - }).(*mem.Table). - WithProjection([]string{"i", "f"}).(*mem.Table). + }).(*memory.Table). + WithProjection([]string{"i", "f"}).(*memory.Table). WithIndexLookup(&mergeableIndexLookup{id: "3.14"}), ), plan.NewResolvedTable( @@ -199,8 +199,8 @@ func TestPushdownIndexable(t *testing.T) { expression.NewLiteral(2, sql.Int32), ), ), - }).(*mem.Table). - WithProjection([]string{"i2"}).(*mem.Table). + }).(*memory.Table). + WithProjection([]string{"i2"}).(*memory.Table). WithIndexLookup(&negateIndexLookup{value: "2"}), ), ), diff --git a/sql/analyzer/resolve_columns_test.go b/sql/analyzer/resolve_columns_test.go index c114deded..7cf178f94 100644 --- a/sql/analyzer/resolve_columns_test.go +++ b/sql/analyzer/resolve_columns_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -14,7 +14,7 @@ import ( func TestQualifyColumnsProject(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Type: sql.Text, Source: "foo"}, {Name: "b", Type: sql.Text, Source: "foo"}, }) @@ -56,7 +56,7 @@ func TestMisusedAlias(t *testing.T) { require := require.New(t) f := getRule("check_aliases") - table := mem.NewTable("mytable", sql.Schema{ + table := memory.NewTable("mytable", sql.Schema{ {Name: "i", Type: sql.Int32}, }) @@ -79,10 +79,10 @@ func TestQualifyColumns(t *testing.T) { require := require.New(t) f := getRule("qualify_columns") - table := mem.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32, Source: "mytable"}}) - table2 := mem.NewTable("mytable2", sql.Schema{{Name: "i", Type: sql.Int32, Source: "mytable2"}}) - sessionTable := mem.NewTable("@@session", sql.Schema{{Name: "autocommit", Type: sql.Int64, Source: "@@session"}}) - globalTable := mem.NewTable("@@global", sql.Schema{{Name: "max_allowed_packet", Type: sql.Int64, Source: "@@global"}}) + table := memory.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32, Source: "mytable"}}) + table2 := memory.NewTable("mytable2", sql.Schema{{Name: "i", Type: sql.Int32, Source: "mytable2"}}) + sessionTable := memory.NewTable("@@session", sql.Schema{{Name: "autocommit", Type: sql.Int64, Source: "@@session"}}) + globalTable := memory.NewTable("@@global", sql.Schema{{Name: "max_allowed_packet", Type: sql.Int64, Source: "@@global"}}) node := plan.NewProject( []sql.Expression{ @@ -250,7 +250,7 @@ func TestQualifyColumnsQualifiedStar(t *testing.T) { require := require.New(t) f := getRule("qualify_columns") - table := mem.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32}}) + table := memory.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32}}) node := plan.NewProject( []sql.Expression{ @@ -334,7 +334,7 @@ func TestResolveGroupingColumns(t *testing.T) { expression.NewUnresolvedColumn("a"), expression.NewUnresolvedColumn("b"), }, - plan.NewResolvedTable(mem.NewTable("table", nil)), + plan.NewResolvedTable(memory.NewTable("table", nil)), ) expected := plan.NewGroupBy( @@ -367,7 +367,7 @@ func TestResolveGroupingColumns(t *testing.T) { ), expression.NewUnresolvedColumn("c"), }, - plan.NewResolvedTable(mem.NewTable("table", nil)), + plan.NewResolvedTable(memory.NewTable("table", nil)), ), ) diff --git a/sql/analyzer/resolve_having_test.go b/sql/analyzer/resolve_having_test.go index aaa62474a..cc3707671 100644 --- a/sql/analyzer/resolve_having_test.go +++ b/sql/analyzer/resolve_having_test.go @@ -3,7 +3,7 @@ package analyzer import ( "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/expression/function/aggregation" @@ -32,7 +32,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetField(0, sql.Int64, "foo", false), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), plan.NewHaving( @@ -46,7 +46,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetField(0, sql.Int64, "foo", false), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), nil, @@ -64,7 +64,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetFieldWithTable(0, sql.Int64, "t", "foo", false), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), plan.NewProject( @@ -84,7 +84,7 @@ func TestResolveHaving(t *testing.T) { aggregation.NewCount(expression.NewStar()), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), ), @@ -102,7 +102,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetFieldWithTable(1, sql.Int64, "t", "foo", false), }, []sql.Expression{expression.NewGetFieldWithTable(1, sql.Int64, "t", "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", sql.Schema{ + plan.NewResolvedTable(memory.NewTable("t", sql.Schema{ {Type: sql.Int64, Name: "i", Source: "t"}, {Type: sql.Int64, Name: "i", Source: "foo"}, })), @@ -123,7 +123,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetFieldWithTable(0, sql.Int64, "t", "i", false), }, []sql.Expression{expression.NewGetFieldWithTable(1, sql.Int64, "t", "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", sql.Schema{ + plan.NewResolvedTable(memory.NewTable("t", sql.Schema{ {Type: sql.Int64, Name: "i", Source: "t"}, {Type: sql.Int64, Name: "i", Source: "foo"}, })), @@ -148,7 +148,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetFieldWithTable(1, sql.Int64, "t", "foo", false), }, []sql.Expression{expression.NewGetFieldWithTable(1, sql.Int64, "t", "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", sql.Schema{ + plan.NewResolvedTable(memory.NewTable("t", sql.Schema{ {Type: sql.Int64, Name: "i", Source: "t"}, {Type: sql.Int64, Name: "i", Source: "foo"}, })), @@ -175,7 +175,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetFieldWithTable(0, sql.Int64, "t", "i", false), }, []sql.Expression{expression.NewGetFieldWithTable(1, sql.Int64, "t", "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", sql.Schema{ + plan.NewResolvedTable(memory.NewTable("t", sql.Schema{ {Type: sql.Int64, Name: "i", Source: "t"}, {Type: sql.Int64, Name: "i", Source: "foo"}, })), @@ -203,7 +203,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetFieldWithTable(0, sql.Int64, "t", "foo", false), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), ), @@ -230,7 +230,7 @@ func TestResolveHaving(t *testing.T) { aggregation.NewCount(expression.NewStar()), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), ), @@ -255,7 +255,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetField(0, sql.Int64, "foo", false), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), ), @@ -275,7 +275,7 @@ func TestResolveHaving(t *testing.T) { expression.NewGetField(0, sql.Int64, "foo", false), }, []sql.Expression{expression.NewGetField(0, sql.Int64, "foo", false)}, - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), ), ), @@ -288,7 +288,7 @@ func TestResolveHaving(t *testing.T) { aggregation.NewCount(expression.NewStar()), expression.NewLiteral(int64(5), sql.Int64), ), - plan.NewResolvedTable(mem.NewTable("t", nil)), + plan.NewResolvedTable(memory.NewTable("t", nil)), ), nil, errHavingNeedsGroupBy, diff --git a/sql/analyzer/resolve_natural_joins_test.go b/sql/analyzer/resolve_natural_joins_test.go index 1c1a41f5e..fa424440a 100644 --- a/sql/analyzer/resolve_natural_joins_test.go +++ b/sql/analyzer/resolve_natural_joins_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -13,13 +13,13 @@ import ( func TestResolveNaturalJoins(t *testing.T) { require := require.New(t) - left := mem.NewTable("t1", sql.Schema{ + left := memory.NewTable("t1", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t1"}, {Name: "b", Type: sql.Int64, Source: "t1"}, {Name: "c", Type: sql.Int64, Source: "t1"}, }) - right := mem.NewTable("t2", sql.Schema{ + right := memory.NewTable("t2", sql.Schema{ {Name: "d", Type: sql.Int64, Source: "t2"}, {Name: "c", Type: sql.Int64, Source: "t2"}, {Name: "b", Type: sql.Int64, Source: "t2"}, @@ -66,13 +66,13 @@ func TestResolveNaturalJoinsColumns(t *testing.T) { rule := getRule("resolve_natural_joins") require := require.New(t) - left := mem.NewTable("t1", sql.Schema{ + left := memory.NewTable("t1", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t1"}, {Name: "b", Type: sql.Int64, Source: "t1"}, {Name: "c", Type: sql.Int64, Source: "t1"}, }) - right := mem.NewTable("t2", sql.Schema{ + right := memory.NewTable("t2", sql.Schema{ {Name: "d", Type: sql.Int64, Source: "t2"}, {Name: "c", Type: sql.Int64, Source: "t2"}, {Name: "b", Type: sql.Int64, Source: "t2"}, @@ -128,13 +128,13 @@ func TestResolveNaturalJoinsTableAlias(t *testing.T) { rule := getRule("resolve_natural_joins") require := require.New(t) - left := mem.NewTable("t1", sql.Schema{ + left := memory.NewTable("t1", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t1"}, {Name: "b", Type: sql.Int64, Source: "t1"}, {Name: "c", Type: sql.Int64, Source: "t1"}, }) - right := mem.NewTable("t2", sql.Schema{ + right := memory.NewTable("t2", sql.Schema{ {Name: "d", Type: sql.Int64, Source: "t2"}, {Name: "c", Type: sql.Int64, Source: "t2"}, {Name: "b", Type: sql.Int64, Source: "t2"}, @@ -192,21 +192,21 @@ func TestResolveNaturalJoinsChained(t *testing.T) { rule := getRule("resolve_natural_joins") require := require.New(t) - left := mem.NewTable("t1", sql.Schema{ + left := memory.NewTable("t1", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t1"}, {Name: "b", Type: sql.Int64, Source: "t1"}, {Name: "c", Type: sql.Int64, Source: "t1"}, {Name: "f", Type: sql.Int64, Source: "t1"}, }) - right := mem.NewTable("t2", sql.Schema{ + right := memory.NewTable("t2", sql.Schema{ {Name: "d", Type: sql.Int64, Source: "t2"}, {Name: "c", Type: sql.Int64, Source: "t2"}, {Name: "b", Type: sql.Int64, Source: "t2"}, {Name: "e", Type: sql.Int64, Source: "t2"}, }) - upperRight := mem.NewTable("t3", sql.Schema{ + upperRight := memory.NewTable("t3", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t3"}, {Name: "b", Type: sql.Int64, Source: "t3"}, {Name: "f", Type: sql.Int64, Source: "t3"}, @@ -297,13 +297,13 @@ func TestResolveNaturalJoinsChained(t *testing.T) { func TestResolveNaturalJoinsEqual(t *testing.T) { require := require.New(t) - left := mem.NewTable("t1", sql.Schema{ + left := memory.NewTable("t1", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t1"}, {Name: "b", Type: sql.Int64, Source: "t1"}, {Name: "c", Type: sql.Int64, Source: "t1"}, }) - right := mem.NewTable("t2", sql.Schema{ + right := memory.NewTable("t2", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t2"}, {Name: "b", Type: sql.Int64, Source: "t2"}, {Name: "c", Type: sql.Int64, Source: "t2"}, @@ -350,13 +350,13 @@ func TestResolveNaturalJoinsEqual(t *testing.T) { func TestResolveNaturalJoinsDisjoint(t *testing.T) { require := require.New(t) - left := mem.NewTable("t1", sql.Schema{ + left := memory.NewTable("t1", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t1"}, {Name: "b", Type: sql.Int64, Source: "t1"}, {Name: "c", Type: sql.Int64, Source: "t1"}, }) - right := mem.NewTable("t2", sql.Schema{ + right := memory.NewTable("t2", sql.Schema{ {Name: "d", Type: sql.Int64, Source: "t2"}, {Name: "e", Type: sql.Int64, Source: "t2"}, }) diff --git a/sql/analyzer/resolve_orderby_test.go b/sql/analyzer/resolve_orderby_test.go index 680df953a..ecf84f857 100644 --- a/sql/analyzer/resolve_orderby_test.go +++ b/sql/analyzer/resolve_orderby_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -15,7 +15,7 @@ func TestResolveOrderBy(t *testing.T) { a := NewDefault(nil) ctx := sql.NewEmptyContext() - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "foo"}, {Name: "b", Type: sql.Int64, Source: "foo"}, }) @@ -242,7 +242,7 @@ func TestResolveOrderByLiterals(t *testing.T) { require := require.New(t) f := getRule("resolve_orderby_literals") - table := mem.NewTable("t", sql.Schema{ + table := memory.NewTable("t", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "t"}, {Name: "b", Type: sql.Int64, Source: "t"}, }) diff --git a/sql/analyzer/resolve_stars_test.go b/sql/analyzer/resolve_stars_test.go index 4816bf2f3..24fcf46bd 100644 --- a/sql/analyzer/resolve_stars_test.go +++ b/sql/analyzer/resolve_stars_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -13,12 +13,12 @@ import ( func TestResolveStar(t *testing.T) { f := getRule("resolve_star") - table := mem.NewTable("mytable", sql.Schema{ + table := memory.NewTable("mytable", sql.Schema{ {Name: "a", Type: sql.Int32, Source: "mytable"}, {Name: "b", Type: sql.Int32, Source: "mytable"}, }) - table2 := mem.NewTable("mytable2", sql.Schema{ + table2 := memory.NewTable("mytable2", sql.Schema{ {Name: "c", Type: sql.Int32, Source: "mytable2"}, {Name: "d", Type: sql.Int32, Source: "mytable2"}, }) diff --git a/sql/analyzer/resolve_subqueries_test.go b/sql/analyzer/resolve_subqueries_test.go index e418e939a..d03878bb1 100644 --- a/sql/analyzer/resolve_subqueries_test.go +++ b/sql/analyzer/resolve_subqueries_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -13,13 +13,13 @@ import ( func TestResolveSubqueries(t *testing.T) { require := require.New(t) - table1 := mem.NewTable("foo", sql.Schema{{Name: "a", Type: sql.Int64, Source: "foo"}}) - table2 := mem.NewTable("bar", sql.Schema{ + table1 := memory.NewTable("foo", sql.Schema{{Name: "a", Type: sql.Int64, Source: "foo"}}) + table2 := memory.NewTable("bar", sql.Schema{ {Name: "b", Type: sql.Int64, Source: "bar"}, {Name: "k", Type: sql.Int64, Source: "bar"}, }) - table3 := mem.NewTable("baz", sql.Schema{{Name: "c", Type: sql.Int64, Source: "baz"}}) - db := mem.NewDatabase("mydb") + table3 := memory.NewTable("baz", sql.Schema{{Name: "c", Type: sql.Int64, Source: "baz"}}) + db := memory.NewDatabase("mydb") db.AddTable("foo", table1) db.AddTable("bar", table2) db.AddTable("baz", table3) diff --git a/sql/analyzer/resolve_tables.go b/sql/analyzer/resolve_tables.go index 7632d1a48..ec495cd36 100644 --- a/sql/analyzer/resolve_tables.go +++ b/sql/analyzer/resolve_tables.go @@ -1,7 +1,7 @@ package analyzer import ( - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/plan" ) @@ -9,7 +9,7 @@ import ( const dualTableName = "dual" var dualTable = func() sql.Table { - t := mem.NewTable(dualTableName, sql.Schema{ + t := memory.NewTable(dualTableName, sql.Schema{ {Name: "dummy", Source: dualTableName, Type: sql.Text, Nullable: false}, }) _ = t.Insert(sql.NewEmptyContext(), sql.NewRow("x")) diff --git a/sql/analyzer/resolve_tables_test.go b/sql/analyzer/resolve_tables_test.go index cb984af5b..2debe3d0a 100644 --- a/sql/analyzer/resolve_tables_test.go +++ b/sql/analyzer/resolve_tables_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/plan" @@ -15,8 +15,8 @@ func TestResolveTables(t *testing.T) { f := getRule("resolve_tables") - table := mem.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32}}) - db := mem.NewDatabase("mydb") + table := memory.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32}}) + db := memory.NewDatabase("mydb") db.AddTable("mytable", table) catalog := sql.NewCatalog() @@ -54,14 +54,14 @@ func TestResolveTablesNested(t *testing.T) { f := getRule("resolve_tables") - table := mem.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32}}) - table2 := mem.NewTable("my_other_table", sql.Schema{{Name: "i", Type: sql.Int32}}) - db := mem.NewDatabase("mydb") + table := memory.NewTable("mytable", sql.Schema{{Name: "i", Type: sql.Int32}}) + table2 := memory.NewTable("my_other_table", sql.Schema{{Name: "i", Type: sql.Int32}}) + db := memory.NewDatabase("mydb") db.AddTable("mytable", table) catalog := sql.NewCatalog() catalog.AddDatabase(db) - db2 := mem.NewDatabase("my_other_db") + db2 := memory.NewDatabase("my_other_db") db2.AddTable("my_other_table", table2) catalog.AddDatabase(db2) diff --git a/sql/analyzer/validation_rules_test.go b/sql/analyzer/validation_rules_test.go index 2d87614b1..1af5ad53a 100644 --- a/sql/analyzer/validation_rules_test.go +++ b/sql/analyzer/validation_rules_test.go @@ -3,7 +3,7 @@ package analyzer import ( "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/expression/function" @@ -57,7 +57,7 @@ func TestValidateGroupBy(t *testing.T) { {Name: "col2", Type: sql.Int64}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("col1_1", int64(1111)), @@ -102,7 +102,7 @@ func TestValidateGroupByErr(t *testing.T) { {Name: "col2", Type: sql.Int64}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("col1_1", int64(1111)), @@ -145,7 +145,7 @@ func TestValidateSchemaSource(t *testing.T) { { "table with valid schema", plan.NewResolvedTable( - mem.NewTable( + memory.NewTable( "mytable", sql.Schema{ {Name: "foo", Source: "mytable"}, @@ -158,7 +158,7 @@ func TestValidateSchemaSource(t *testing.T) { { "table with invalid schema", plan.NewResolvedTable( - mem.NewTable( + memory.NewTable( "mytable", sql.Schema{ {Name: "foo", Source: ""}, @@ -171,7 +171,7 @@ func TestValidateSchemaSource(t *testing.T) { { "table alias with table", plan.NewTableAlias("foo", plan.NewResolvedTable( - mem.NewTable("mytable", sql.Schema{ + memory.NewTable("mytable", sql.Schema{ {Name: "foo", Source: "mytable"}, }), )), @@ -311,7 +311,7 @@ func TestValidateProjectTuples(t *testing.T) { } func TestValidateIndexCreation(t *testing.T) { - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, }) diff --git a/sql/cache.go b/sql/cache.go new file mode 100644 index 000000000..01e28b858 --- /dev/null +++ b/sql/cache.go @@ -0,0 +1,131 @@ +package sql + +import ( + "fmt" + "hash/crc64" + "runtime" + + lru "github.com/hashicorp/golang-lru" + errors "gopkg.in/src-d/go-errors.v1" +) + +var table = crc64.MakeTable(crc64.ISO) + +// CacheKey returns a hash of the given value to be used as key in +// a cache. +func CacheKey(v interface{}) uint64 { + return crc64.Checksum([]byte(fmt.Sprintf("%#v", v)), table) +} + +// ErrKeyNotFound is returned when the key could not be found in the cache. +var ErrKeyNotFound = errors.NewKind("memory: key %d not found in cache") + +type lruCache struct { + memory Freeable + reporter Reporter + size int + cache *lru.Cache +} + +func newLRUCache(memory Freeable, r Reporter, size uint) *lruCache { + lru, _ := lru.New(int(size)) + return &lruCache{memory, r, int(size), lru} +} + +func (l *lruCache) Put(k uint64, v interface{}) error { + if releaseMemoryIfNeeded(l.reporter, l.Free, l.memory.Free) { + l.cache.Add(k, v) + } + return nil +} + +func (l *lruCache) Get(k uint64) (interface{}, error) { + v, ok := l.cache.Get(k) + if !ok { + return nil, ErrKeyNotFound.New(k) + } + + return v, nil +} + +func (l *lruCache) Free() { + l.cache, _ = lru.New(l.size) +} + +func (l *lruCache) Dispose() { + l.memory = nil + l.cache = nil +} + +type rowsCache struct { + memory Freeable + reporter Reporter + rows []Row +} + +func newRowsCache(memory Freeable, r Reporter) *rowsCache { + return &rowsCache{memory, r, nil} +} + +func (c *rowsCache) Add(row Row) error { + if !releaseMemoryIfNeeded(c.reporter, c.memory.Free) { + return ErrNoMemoryAvailable.New() + } + + c.rows = append(c.rows, row) + return nil +} + +func (c *rowsCache) Get() []Row { return c.rows } + +func (c *rowsCache) Dispose() { + c.memory = nil + c.rows = nil +} + +type historyCache struct { + memory Freeable + reporter Reporter + cache map[uint64]interface{} +} + +func newHistoryCache(memory Freeable, r Reporter) *historyCache { + return &historyCache{memory, r, make(map[uint64]interface{})} +} + +func (h *historyCache) Put(k uint64, v interface{}) error { + if !releaseMemoryIfNeeded(h.reporter, h.memory.Free) { + return ErrNoMemoryAvailable.New() + } + h.cache[k] = v + return nil +} + +func (h *historyCache) Get(k uint64) (interface{}, error) { + v, ok := h.cache[k] + if !ok { + return nil, ErrKeyNotFound.New(k) + } + return v, nil +} + +func (h *historyCache) Dispose() { + h.memory = nil + h.cache = nil +} + +// releasesMemoryIfNeeded releases memory if needed using the following steps +// until there is available memory. It returns whether or not there was +// available memory after all the steps. +func releaseMemoryIfNeeded(r Reporter, steps ...func()) bool { + for _, s := range steps { + if HasAvailableMemory(r) { + return true + } + + s() + runtime.GC() + } + + return HasAvailableMemory(r) +} diff --git a/sql/cache_test.go b/sql/cache_test.go new file mode 100644 index 000000000..7984f7986 --- /dev/null +++ b/sql/cache_test.go @@ -0,0 +1,169 @@ +package sql + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCacheKey(t *testing.T) { + k := CacheKey(1) + require.Equal(t, uint64(0x4320000000000000), k) +} + +func TestLRUCache(t *testing.T) { + t.Run("basic methods", func(t *testing.T) { + require := require.New(t) + + cache := newLRUCache(mockMemory{}, fixedReporter(5, 50), 10) + + require.NoError(cache.Put(1, "foo")) + v, err := cache.Get(1) + require.NoError(err) + require.Equal("foo", v) + + _, err = cache.Get(2) + require.Error(err) + require.True(ErrKeyNotFound.Is(err)) + + // Free the cache and check previous entry disappeared. + cache.Free() + + _, err = cache.Get(1) + require.Error(err) + require.True(ErrKeyNotFound.Is(err)) + + cache.Dispose() + require.Panics(func() { + _, _ = cache.Get(1) + }) + }) + + t.Run("no memory available", func(t *testing.T) { + require := require.New(t) + cache := newLRUCache(mockMemory{}, fixedReporter(51, 50), 5) + + require.NoError(cache.Put(1, "foo")) + _, err := cache.Get(1) + require.Error(err) + require.True(ErrKeyNotFound.Is(err)) + }) + + t.Run("free required to add entry", func(t *testing.T) { + require := require.New(t) + var freed bool + cache := newLRUCache( + mockMemory{func() { + freed = true + }}, + mockReporter{func() uint64 { + if freed { + return 0 + } + return 51 + }, 50}, + 5, + ) + require.NoError(cache.Put(1, "foo")) + v, err := cache.Get(1) + require.NoError(err) + require.Equal("foo", v) + require.True(freed) + }) +} + +func TestHistoryCache(t *testing.T) { + t.Run("basic methods", func(t *testing.T) { + require := require.New(t) + + cache := newHistoryCache(mockMemory{}, fixedReporter(5, 50)) + + require.NoError(cache.Put(1, "foo")) + v, err := cache.Get(1) + require.NoError(err) + require.Equal("foo", v) + + _, err = cache.Get(2) + require.Error(err) + require.True(ErrKeyNotFound.Is(err)) + + cache.Dispose() + require.Panics(func() { + _ = cache.Put(2, "foo") + }) + }) + + t.Run("no memory available", func(t *testing.T) { + require := require.New(t) + cache := newHistoryCache(mockMemory{}, fixedReporter(51, 50)) + + err := cache.Put(1, "foo") + require.Error(err) + require.True(ErrNoMemoryAvailable.Is(err)) + }) + + t.Run("free required to add entry", func(t *testing.T) { + require := require.New(t) + var freed bool + cache := newHistoryCache( + mockMemory{func() { + freed = true + }}, + mockReporter{func() uint64 { + if freed { + return 0 + } + return 51 + }, 50}, + ) + require.NoError(cache.Put(1, "foo")) + v, err := cache.Get(1) + require.NoError(err) + require.Equal("foo", v) + require.True(freed) + }) +} + +func TestRowsCache(t *testing.T) { + t.Run("basic methods", func(t *testing.T) { + require := require.New(t) + + cache := newRowsCache(mockMemory{}, fixedReporter(5, 50)) + + require.NoError(cache.Add(Row{1})) + require.Len(cache.Get(), 1) + + cache.Dispose() + require.Panics(func() { + _ = cache.Add(Row{2}) + }) + }) + + t.Run("no memory available", func(t *testing.T) { + require := require.New(t) + cache := newRowsCache(mockMemory{}, fixedReporter(51, 50)) + + err := cache.Add(Row{1, "foo"}) + require.Error(err) + require.True(ErrNoMemoryAvailable.Is(err)) + }) + + t.Run("free required to add entry", func(t *testing.T) { + require := require.New(t) + var freed bool + cache := newRowsCache( + mockMemory{func() { + freed = true + }}, + mockReporter{func() uint64 { + if freed { + return 0 + } + return 51 + }, 50}, + ) + require.NoError(cache.Add(Row{1, "foo"})) + require.Len(cache.Get(), 1) + require.True(freed) + }) +} diff --git a/sql/catalog.go b/sql/catalog.go index 1281815d5..ee3a48ade 100644 --- a/sql/catalog.go +++ b/sql/catalog.go @@ -18,6 +18,7 @@ type Catalog struct { FunctionRegistry *IndexRegistry *ProcessList + *MemoryManager mu sync.RWMutex currentDatabase string @@ -36,6 +37,7 @@ func NewCatalog() *Catalog { return &Catalog{ FunctionRegistry: NewFunctionRegistry(), IndexRegistry: NewIndexRegistry(), + MemoryManager: NewMemoryManager(ProcessMemory), ProcessList: NewProcessList(), locks: make(sessionLocks), } diff --git a/sql/catalog_test.go b/sql/catalog_test.go index 87af26635..176011986 100644 --- a/sql/catalog_test.go +++ b/sql/catalog_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) @@ -14,7 +14,7 @@ func TestCatalogCurrentDatabase(t *testing.T) { c := sql.NewCatalog() require.Equal("", c.CurrentDatabase()) - c.AddDatabase(mem.NewDatabase("foo")) + c.AddDatabase(memory.NewDatabase("foo")) require.Equal("foo", c.CurrentDatabase()) c.SetCurrentDatabase("bar") @@ -25,9 +25,9 @@ func TestAllDatabases(t *testing.T) { require := require.New(t) var dbs = sql.Databases{ - mem.NewDatabase("a"), - mem.NewDatabase("b"), - mem.NewDatabase("c"), + memory.NewDatabase("a"), + memory.NewDatabase("b"), + memory.NewDatabase("c"), } c := sql.NewCatalog() @@ -46,7 +46,7 @@ func TestCatalogDatabase(t *testing.T) { require.EqualError(err, "database not found: foo") require.Nil(db) - mydb := mem.NewDatabase("foo") + mydb := memory.NewDatabase("foo") c.AddDatabase(mydb) db, err = c.Database("flo") @@ -67,14 +67,14 @@ func TestCatalogTable(t *testing.T) { require.EqualError(err, "database not found: foo") require.Nil(table) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") c.AddDatabase(db) table, err = c.Table("foo", "bar") require.EqualError(err, "table not found: bar") require.Nil(table) - mytable := mem.NewTable("bar", nil) + mytable := memory.NewTable("bar", nil) db.AddTable("bar", mytable) table, err = c.Table("foo", "baz") @@ -93,9 +93,9 @@ func TestCatalogTable(t *testing.T) { func TestCatalogUnlockTables(t *testing.T) { require := require.New(t) - db := mem.NewDatabase("db") - t1 := newLockableTable(mem.NewTable("t1", nil)) - t2 := newLockableTable(mem.NewTable("t2", nil)) + db := memory.NewDatabase("db") + t1 := newLockableTable(memory.NewTable("t1", nil)) + t2 := newLockableTable(memory.NewTable("t2", nil)) db.AddTable("t1", t1) db.AddTable("t2", t2) diff --git a/sql/memory.go b/sql/memory.go new file mode 100644 index 000000000..3b8eb7173 --- /dev/null +++ b/sql/memory.go @@ -0,0 +1,198 @@ +package sql + +import ( + "os" + "runtime" + "strconv" + "sync" + + errors "gopkg.in/src-d/go-errors.v1" +) + +// Disposable objects can erase all their content when they're no longer in use. +// They should not be used again after they've been disposed. +type Disposable interface { + // Dispose the contents. + Dispose() +} + +// Freeable objects can free their memory. +type Freeable interface { + // Free the memory. + Free() +} + +// KeyValueCache is a cache of key value pairs. +type KeyValueCache interface { + // Put a new value in the cache. If there is no more memory and the cache is + // not Freeable it will try to free some memory from other freeable caches. + // If there's still no more memory available, it will fail and erase all the + // contents of the cache. + // If it's Freeable, it will be freed, then the new value will be inserted. + Put(uint64, interface{}) error + // Get the value with the given key. + Get(uint64) (interface{}, error) +} + +// RowsCache is a cache of rows. +type RowsCache interface { + // Add a new row to the cache. If there is no memory available, it will try to + // free some memory. If after that there is still no memory available, it + // will return an error and erase all the content of the cache. + Add(Row) error + // Get all rows. + Get() []Row +} + +// ErrNoMemoryAvailable is returned when there is no more available memory. +var ErrNoMemoryAvailable = errors.NewKind("no memory available") + +const maxMemoryKey = "MAX_MEMORY" + +const ( + b = 1 + kib = 1024 * b + mib = 1024 * kib +) + +var maxMemory = func() uint64 { + val := os.Getenv(maxMemoryKey) + var v uint64 + if val != "" { + var err error + v, err = strconv.ParseUint(val, 10, 64) + if err != nil { + panic("MAX_MEMORY environment variable must be a number, but got: " + val) + } + } + + return v * uint64(mib) +}() + +// Reporter is a component that gives information about the memory usage. +type Reporter interface { + // MaxMemory returns the maximum number of memory allowed in bytes. + MaxMemory() uint64 + // UsedMemory returns the memory in use in bytes. + UsedMemory() uint64 +} + +// ProcessMemory is a reporter for the memory used by the process and the +// maximum amount of memory allowed controlled by the MAX_MEMORY environment +// variable. +var ProcessMemory Reporter = new(processReporter) + +type processReporter struct{} + +func (processReporter) UsedMemory() uint64 { + var s runtime.MemStats + runtime.ReadMemStats(&s) + return s.HeapInuse + s.StackInuse +} + +func (processReporter) MaxMemory() uint64 { return maxMemory } + +// HasAvailableMemory reports whether more memory is available to the program if +// it hasn't reached the max memory limit. +func HasAvailableMemory(r Reporter) bool { + maxMemory := r.MaxMemory() + if maxMemory == 0 { + return true + } + + return r.UsedMemory() < maxMemory +} + +// MemoryManager is in charge of keeping track and managing all the components that operate +// in memory. There should only be one instance of a memory manager running at the +// same time in each process. +type MemoryManager struct { + mu sync.RWMutex + reporter Reporter + caches map[uint64]Disposable + token uint64 +} + +// NewMemoryManager creates a new manager with the given memory reporter. If nil is given, +// then the Process reporter will be used by default. +func NewMemoryManager(r Reporter) *MemoryManager { + if r == nil { + r = ProcessMemory + } + + return &MemoryManager{ + reporter: r, + caches: make(map[uint64]Disposable), + } +} + +// HasAvailable reports whether the memory manager has any available memory. +func (m *MemoryManager) HasAvailable() bool { + return HasAvailableMemory(m.reporter) +} + +// DisposeFunc is a function to completely erase a cache and remove it from the manager. +type DisposeFunc func() + +// NewLRUCache returns an empty LRU cache and a function to dispose it when it's +// no longer needed. +func (m *MemoryManager) NewLRUCache(size uint) (KeyValueCache, DisposeFunc) { + c := newLRUCache(m, m.reporter, size) + pos := m.addCache(c) + return c, func() { + c.Dispose() + m.removeCache(pos) + } +} + +// NewHistoryCache returns an empty history cache and a function to dispose it when it's +// no longer needed. +func (m *MemoryManager) NewHistoryCache() (KeyValueCache, DisposeFunc) { + c := newHistoryCache(m, m.reporter) + pos := m.addCache(c) + return c, func() { + c.Dispose() + m.removeCache(pos) + } +} + +// NewRowsCache returns an empty rows cache and a function to dispose it when it's +// no longer needed. +func (m *MemoryManager) NewRowsCache() (RowsCache, DisposeFunc) { + c := newRowsCache(m, m.reporter) + pos := m.addCache(c) + return c, func() { + c.Dispose() + m.removeCache(pos) + } +} + +func (m *MemoryManager) addCache(c Disposable) (pos uint64) { + m.mu.Lock() + defer m.mu.Unlock() + m.token++ + m.caches[m.token] = c + return m.token +} + +func (m *MemoryManager) removeCache(pos uint64) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.caches, pos) + + if len(m.caches) == 0 { + m.token = 0 + } +} + +// Free the memory of all freeable caches. +func (m *MemoryManager) Free() { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, c := range m.caches { + if f, ok := c.(Freeable); ok { + f.Free() + } + } +} diff --git a/sql/memory_test.go b/sql/memory_test.go new file mode 100644 index 000000000..c4dcf7d57 --- /dev/null +++ b/sql/memory_test.go @@ -0,0 +1,79 @@ +package sql + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestManager(t *testing.T) { + require := require.New(t) + m := NewMemoryManager(nil) + + kv, dispose := m.NewLRUCache(5) + _, ok := kv.(*lruCache) + require.True(ok) + require.Len(m.caches, 1) + dispose() + require.Len(m.caches, 0) + + kv, dispose = m.NewHistoryCache() + _, ok = kv.(*historyCache) + require.True(ok) + require.Len(m.caches, 1) + dispose() + require.Len(m.caches, 0) + + rc, dispose := m.NewRowsCache() + _, ok = rc.(*rowsCache) + require.True(ok) + require.Len(m.caches, 1) + dispose() + require.Len(m.caches, 0) + + m.addCache(disposableCache{}) + f := new(freeableCache) + m.addCache(f) + m.Free() + require.True(f.freed) +} + +type disposableCache struct{} + +func (d disposableCache) Dispose() {} + +type freeableCache struct { + disposableCache + freed bool +} + +func (f *freeableCache) Free() { f.freed = true } + +func TestHasAvailable(t *testing.T) { + require.True(t, HasAvailableMemory(fixedReporter(2, 5))) + require.False(t, HasAvailableMemory(fixedReporter(6, 5))) +} + +type mockReporter struct { + f func() uint64 + max uint64 +} + +func (m mockReporter) UsedMemory() uint64 { return m.f() } +func (m mockReporter) MaxMemory() uint64 { return m.max } + +func fixedReporter(v, max uint64) mockReporter { + return mockReporter{func() uint64 { + return v + }, max} +} + +type mockMemory struct { + f func() +} + +func (m mockMemory) Free() { + if m.f != nil { + m.f() + } +} diff --git a/sql/plan/common_test.go b/sql/plan/common_test.go index 301ae0db1..35043f7fd 100644 --- a/sql/plan/common_test.go +++ b/sql/plan/common_test.go @@ -7,11 +7,11 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) -var benchtable = func() *mem.Table { +var benchtable = func() *memory.Table { schema := sql.Schema{ {Name: "strfield", Type: sql.Text, Nullable: true}, {Name: "floatfield", Type: sql.Float64, Nullable: true}, @@ -20,7 +20,7 @@ var benchtable = func() *mem.Table { {Name: "bigintfield", Type: sql.Int64, Nullable: false}, {Name: "blobfield", Type: sql.Blob, Nullable: false}, } - t := mem.NewTable("test", schema) + t := memory.NewTable("test", schema) for i := 0; i < 100; i++ { n := fmt.Sprint(i) @@ -109,7 +109,7 @@ func collectRows(t *testing.T, node sql.Node) []sql.Row { func TestIsUnary(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", nil) + table := memory.NewTable("foo", nil) require.True(IsUnary(NewFilter(nil, NewResolvedTable(table)))) require.False(IsUnary(NewCrossJoin( @@ -120,7 +120,7 @@ func TestIsUnary(t *testing.T) { func TestIsBinary(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", nil) + table := memory.NewTable("foo", nil) require.False(IsBinary(NewFilter(nil, NewResolvedTable(table)))) require.True(IsBinary(NewCrossJoin( diff --git a/sql/plan/create_index_test.go b/sql/plan/create_index_test.go index d176b0a79..0cb509583 100644 --- a/sql/plan/create_index_test.go +++ b/sql/plan/create_index_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/test" @@ -18,7 +18,7 @@ import ( func TestCreateIndexAsync(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, {Name: "c", Source: "foo"}, @@ -27,7 +27,7 @@ func TestCreateIndexAsync(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", table) catalog.AddDatabase(db) @@ -72,7 +72,7 @@ func TestCreateIndexAsync(t *testing.T) { func TestCreateIndexNotIndexableExprs(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo", Type: sql.Blob}, {Name: "b", Source: "foo", Type: sql.JSON}, {Name: "c", Source: "foo", Type: sql.Text}, @@ -81,7 +81,7 @@ func TestCreateIndexNotIndexableExprs(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", table) catalog.AddDatabase(db) @@ -121,7 +121,7 @@ func TestCreateIndexNotIndexableExprs(t *testing.T) { func TestCreateIndexSync(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, {Name: "c", Source: "foo"}, @@ -130,7 +130,7 @@ func TestCreateIndexSync(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", table) catalog.AddDatabase(db) @@ -175,7 +175,7 @@ func TestCreateIndexChecksum(t *testing.T) { require := require.New(t) table := &checksumTable{ - mem.NewTable("foo", sql.Schema{ + memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, {Name: "c", Source: "foo"}, @@ -186,7 +186,7 @@ func TestCreateIndexChecksum(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", table) catalog.AddDatabase(db) @@ -217,7 +217,7 @@ func TestCreateIndexChecksumWithUnderlying(t *testing.T) { &underlyingTable{ &underlyingTable{ &checksumTable{ - mem.NewTable("foo", sql.Schema{ + memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, {Name: "c", Source: "foo"}, @@ -253,7 +253,7 @@ func TestCreateIndexChecksumWithUnderlying(t *testing.T) { func TestCreateIndexWithIter(t *testing.T) { require := require.New(t) - foo := mem.NewPartitionedTable("foo", sql.Schema{ + foo := memory.NewPartitionedTable("foo", sql.Schema{ {Name: "one", Source: "foo", Type: sql.Int64}, {Name: "two", Source: "foo", Type: sql.Int64}, }, 2) @@ -277,7 +277,7 @@ func TestCreateIndexWithIter(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", foo) catalog.AddDatabase(db) diff --git a/sql/plan/cross_join_test.go b/sql/plan/cross_join_test.go index fbccc20c1..f45548e99 100644 --- a/sql/plan/cross_join_test.go +++ b/sql/plan/cross_join_test.go @@ -4,7 +4,7 @@ import ( "io" "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/stretchr/testify/require" ) @@ -38,8 +38,8 @@ func TestCrossJoin(t *testing.T) { {Name: "rcol4", Type: sql.Int64}, } - ltable := mem.NewTable("left", lSchema) - rtable := mem.NewTable("right", rSchema) + ltable := memory.NewTable("left", lSchema) + rtable := memory.NewTable("right", rSchema) insertData(t, ltable) insertData(t, rtable) @@ -99,8 +99,8 @@ func TestCrossJoin_Empty(t *testing.T) { require := require.New(t) ctx := sql.NewEmptyContext() - ltable := mem.NewTable("left", lSchema) - rtable := mem.NewTable("right", rSchema) + ltable := memory.NewTable("left", lSchema) + rtable := memory.NewTable("right", rSchema) insertData(t, ltable) j := NewCrossJoin( @@ -116,8 +116,8 @@ func TestCrossJoin_Empty(t *testing.T) { require.Equal(io.EOF, err) require.Nil(row) - ltable = mem.NewTable("left", lSchema) - rtable = mem.NewTable("right", rSchema) + ltable = memory.NewTable("left", lSchema) + rtable = memory.NewTable("right", rSchema) insertData(t, rtable) j = NewCrossJoin( @@ -134,7 +134,7 @@ func TestCrossJoin_Empty(t *testing.T) { require.Nil(row) } -func insertData(t *testing.T, table *mem.Table) { +func insertData(t *testing.T, table *memory.Table) { t.Helper() require := require.New(t) diff --git a/sql/plan/ddl_test.go b/sql/plan/ddl_test.go index f099e2922..8347cb925 100644 --- a/sql/plan/ddl_test.go +++ b/sql/plan/ddl_test.go @@ -5,14 +5,14 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) func TestCreateTable(t *testing.T) { require := require.New(t) - db := mem.NewDatabase("test") + db := memory.NewDatabase("test") tables := db.Tables() _, ok := tables["testTable"] require.False(ok) diff --git a/sql/plan/describe_test.go b/sql/plan/describe_test.go index 5b23e2e23..157bfff61 100644 --- a/sql/plan/describe_test.go +++ b/sql/plan/describe_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -14,7 +14,7 @@ func TestDescribe(t *testing.T) { require := require.New(t) ctx := sql.NewEmptyContext() - table := mem.NewTable("test", sql.Schema{ + table := memory.NewTable("test", sql.Schema{ {Name: "c1", Type: sql.Text}, {Name: "c2", Type: sql.Int32}, }) @@ -55,7 +55,7 @@ func TestDescribe_Empty(t *testing.T) { func TestDescribeQuery(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Source: "foo", Name: "a", Type: sql.Text}, {Source: "foo", Name: "b", Type: sql.Text}, }) diff --git a/sql/plan/distinct.go b/sql/plan/distinct.go index e14aee5fb..8a3da753a 100644 --- a/sql/plan/distinct.go +++ b/sql/plan/distinct.go @@ -1,9 +1,8 @@ package plan import ( - "fmt" + "io" - "github.com/mitchellh/hashstructure" "github.com/src-d/go-mysql-server/sql" ) @@ -34,7 +33,7 @@ func (d *Distinct) RowIter(ctx *sql.Context) (sql.RowIter, error) { return nil, err } - return sql.NewSpanIter(span, newDistinctIter(it)), nil + return sql.NewSpanIter(span, newDistinctIter(ctx, it)), nil } // WithChildren implements the Node interface. @@ -55,18 +54,21 @@ func (d Distinct) String() string { // distinctIter keeps track of the hashes of all rows that have been emitted. // It does not emit any rows whose hashes have been seen already. -// TODO: come up with a way to use less memory than keeping all hashes in mem. +// TODO: come up with a way to use less memory than keeping all hashes in memory. // Even though they are just 64-bit integers, this could be a problem in large // result sets. type distinctIter struct { childIter sql.RowIter - seen map[uint64]struct{} + seen sql.KeyValueCache + dispose sql.DisposeFunc } -func newDistinctIter(child sql.RowIter) *distinctIter { +func newDistinctIter(ctx *sql.Context, child sql.RowIter) *distinctIter { + cache, dispose := ctx.Memory.NewHistoryCache() return &distinctIter{ childIter: child, - seen: make(map[uint64]struct{}), + seen: cache, + dispose: dispose, } } @@ -74,29 +76,38 @@ func (di *distinctIter) Next() (sql.Row, error) { for { row, err := di.childIter.Next() if err != nil { + if err == io.EOF { + di.Dispose() + } return nil, err } - hash, err := hashstructure.Hash(row, nil) - if err != nil { - return nil, fmt.Errorf("unable to hash row: %s", err) + hash := sql.CacheKey(row) + if _, err := di.seen.Get(hash); err == nil { + continue } - if _, ok := di.seen[hash]; ok { - continue + if err := di.seen.Put(hash, struct{}{}); err != nil { + return nil, err } - di.seen[hash] = struct{}{} return row, nil } } func (di *distinctIter) Close() error { + di.Dispose() return di.childIter.Close() } +func (di *distinctIter) Dispose() { + if di.dispose != nil { + di.dispose() + } +} + // OrderedDistinct is a Distinct node optimized for sorted row sets. -// It's 2 orders of magnitude faster and uses 2 orders of magnitude less mem. +// It's 2 orders of magnitude faster and uses 2 orders of magnitude less memory. type OrderedDistinct struct { UnaryNode } diff --git a/sql/plan/distinct_test.go b/sql/plan/distinct_test.go index 0aa6461a9..098308d47 100644 --- a/sql/plan/distinct_test.go +++ b/sql/plan/distinct_test.go @@ -4,10 +4,10 @@ import ( "io" "testing" - "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" + "github.com/stretchr/testify/require" ) func TestDistinct(t *testing.T) { @@ -18,7 +18,7 @@ func TestDistinct(t *testing.T) { {Name: "name", Type: sql.Text, Nullable: true}, {Name: "email", Type: sql.Text, Nullable: true}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("john", "john@doe.com"), @@ -65,7 +65,7 @@ func TestOrderedDistinct(t *testing.T) { {Name: "name", Type: sql.Text, Nullable: true}, {Name: "email", Type: sql.Text, Nullable: true}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("jane", "jane@doe.com"), diff --git a/sql/plan/drop_index_test.go b/sql/plan/drop_index_test.go index 86ba9aed7..f90dc9568 100644 --- a/sql/plan/drop_index_test.go +++ b/sql/plan/drop_index_test.go @@ -5,7 +5,7 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -13,7 +13,7 @@ import ( func TestDeleteIndex(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, {Name: "c", Source: "foo"}, @@ -22,7 +22,7 @@ func TestDeleteIndex(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", table) catalog.AddDatabase(db) @@ -56,7 +56,7 @@ func TestDeleteIndex(t *testing.T) { func TestDeleteIndexNotReady(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, {Name: "c", Source: "foo"}, @@ -65,7 +65,7 @@ func TestDeleteIndexNotReady(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", table) catalog.AddDatabase(db) @@ -101,7 +101,7 @@ func TestDeleteIndexNotReady(t *testing.T) { func TestDeleteIndexOutdated(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo"}, {Name: "b", Source: "foo"}, {Name: "c", Source: "foo"}, @@ -110,7 +110,7 @@ func TestDeleteIndexOutdated(t *testing.T) { driver := new(mockDriver) catalog := sql.NewCatalog() catalog.RegisterIndexDriver(driver) - db := mem.NewDatabase("foo") + db := memory.NewDatabase("foo") db.AddTable("foo", table) catalog.AddDatabase(db) diff --git a/sql/plan/filter_test.go b/sql/plan/filter_test.go index 6c0f8f76b..ea031831b 100644 --- a/sql/plan/filter_test.go +++ b/sql/plan/filter_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -19,7 +19,7 @@ func TestFilter(t *testing.T) { {Name: "col3", Type: sql.Int32, Nullable: true}, {Name: "col4", Type: sql.Int64, Nullable: true}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("col1_1", "col2_1", int32(1111), int64(2222)), diff --git a/sql/plan/group_by.go b/sql/plan/group_by.go index 620167568..4a9fb76b0 100644 --- a/sql/plan/group_by.go +++ b/sql/plan/group_by.go @@ -28,7 +28,6 @@ func NewGroupBy( grouping []sql.Expression, child sql.Node, ) *GroupBy { - return &GroupBy{ UnaryNode: UnaryNode{Child: child}, Aggregate: aggregate, @@ -102,7 +101,7 @@ func (p *GroupBy) WithChildren(children ...sql.Node) (sql.Node, error) { return NewGroupBy(p.Aggregate, p.Grouping, children[0]), nil } -// WithChildren implements the Node interface. +// WithExpressions implements the Node interface. func (p *GroupBy) WithExpressions(exprs ...sql.Expression) (sql.Node, error) { expected := len(p.Aggregate) + len(p.Grouping) if len(exprs) != expected { @@ -206,11 +205,12 @@ func (i *groupByIter) Close() error { type groupByGroupingIter struct { aggregate []sql.Expression grouping []sql.Expression - aggregation map[uint64][]sql.Row + aggregation sql.KeyValueCache keys []uint64 pos int child sql.RowIter ctx *sql.Context + dispose sql.DisposeFunc } func newGroupByGroupingIter( @@ -228,7 +228,7 @@ func newGroupByGroupingIter( func (i *groupByGroupingIter) Next() (sql.Row, error) { if i.aggregation == nil { - i.aggregation = make(map[uint64][]sql.Row) + i.aggregation, i.dispose = i.ctx.Memory.NewHistoryCache() if err := i.compute(); err != nil { return nil, err } @@ -238,9 +238,12 @@ func (i *groupByGroupingIter) Next() (sql.Row, error) { return nil, io.EOF } - buffers := i.aggregation[i.keys[i.pos]] + buffers, err := i.aggregation.Get(i.keys[i.pos]) + if err != nil { + return nil, err + } i.pos++ - return evalBuffers(i.ctx, buffers, i.aggregate) + return evalBuffers(i.ctx, buffers.([]sql.Row), i.aggregate) } func (i *groupByGroupingIter) compute() error { @@ -258,16 +261,25 @@ func (i *groupByGroupingIter) compute() error { return err } - if _, ok := i.aggregation[key]; !ok { + if _, err := i.aggregation.Get(key); err != nil { var buf = make([]sql.Row, len(i.aggregate)) for j, a := range i.aggregate { buf[j] = fillBuffer(a) } - i.aggregation[key] = buf + + if err := i.aggregation.Put(key, buf); err != nil { + return err + } + i.keys = append(i.keys, key) } - err = updateBuffers(i.ctx, i.aggregation[key], i.aggregate, row) + b, err := i.aggregation.Get(key) + if err != nil { + return err + } + + err = updateBuffers(i.ctx, b.([]sql.Row), i.aggregate, row) if err != nil { return err } diff --git a/sql/plan/group_by_test.go b/sql/plan/group_by_test.go index 9e14a6f6d..34ce7c95f 100644 --- a/sql/plan/group_by_test.go +++ b/sql/plan/group_by_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/src-d/go-mysql-server/sql/expression/function/aggregation" @@ -13,7 +13,7 @@ import ( func TestGroupBySchema(t *testing.T) { require := require.New(t) - child := mem.NewTable("test", nil) + child := memory.NewTable("test", nil) agg := []sql.Expression{ expression.NewAlias(expression.NewLiteral("s", sql.Text), "c1"), expression.NewAlias(aggregation.NewCount(expression.NewStar()), "c2"), @@ -28,7 +28,7 @@ func TestGroupBySchema(t *testing.T) { func TestGroupByResolved(t *testing.T) { require := require.New(t) - child := mem.NewTable("test", nil) + child := memory.NewTable("test", nil) agg := []sql.Expression{ expression.NewAlias(aggregation.NewCount(expression.NewStar()), "c2"), } @@ -50,7 +50,7 @@ func TestGroupByRowIter(t *testing.T) { {Name: "col1", Type: sql.Text}, {Name: "col2", Type: sql.Int64}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("col1_1", int64(1111)), @@ -114,7 +114,7 @@ func TestGroupByAggregationGrouping(t *testing.T) { {Name: "col2", Type: sql.Int64}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("col1_1", int64(1111)), @@ -208,7 +208,7 @@ func benchmarkTable(t testing.TB) sql.Table { t.Helper() require := require.New(t) - table := mem.NewTable("test", sql.Schema{ + table := memory.NewTable("test", sql.Schema{ {Name: "a", Type: sql.Int64}, {Name: "b", Type: sql.Int64}, }) diff --git a/sql/plan/having_test.go b/sql/plan/having_test.go index 34ccc649d..e4b90ccc7 100644 --- a/sql/plan/having_test.go +++ b/sql/plan/having_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -19,7 +19,7 @@ func TestHaving(t *testing.T) { {Name: "col3", Type: sql.Int32, Nullable: true}, {Name: "col4", Type: sql.Int64, Nullable: true}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("col1_1", "col2_1", int32(1111), int64(2222)), diff --git a/sql/plan/join.go b/sql/plan/join.go index a287a424a..97f1abc13 100644 --- a/sql/plan/join.go +++ b/sql/plan/join.go @@ -4,53 +4,24 @@ import ( "io" "os" "reflect" - "runtime" - "strconv" "strings" opentracing "github.com/opentracing/opentracing-go" - "github.com/pbnjay/memory" - "github.com/sirupsen/logrus" "github.com/src-d/go-mysql-server/sql" ) const ( - inMemoryJoinKey = "INMEMORY_JOINS" - maxMemoryJoinKey = "MAX_MEMORY_JOIN" - inMemoryJoinSessionVar = "inmemory_joins" - memoryThresholdSessionVar = "max_memory_joins" + inMemoryJoinKey = "INMEMORY_JOINS" + inMemoryJoinSessionVar = "inmemory_joins" ) -var ( - useInMemoryJoins = shouldUseMemoryJoinsByEnv() - // One fifth of the total physical memory available on the OS (ignoring the - // memory used by other processes). - defaultMemoryThreshold = memory.TotalMemory() / 5 - // Maximum amount of memory the gitbase server can have in use before - // considering all joins should be done using multipass mode. - maxMemoryJoin = loadMemoryThreshold() -) +var useInMemoryJoins = shouldUseMemoryJoinsByEnv() func shouldUseMemoryJoinsByEnv() bool { v := strings.TrimSpace(strings.ToLower(os.Getenv(inMemoryJoinKey))) return v == "on" || v == "1" } -func loadMemoryThreshold() uint64 { - v, ok := os.LookupEnv(maxMemoryJoinKey) - if !ok { - return defaultMemoryThreshold - } - - n, err := strconv.ParseUint(v, 10, 64) - if err != nil { - logrus.Warnf("invalid value %q given to %s environment variable", v, maxMemoryJoinKey) - return defaultMemoryThreshold - } - - return n * 1024 // to bytes -} - // InnerJoin is an inner join between two tables. type InnerJoin struct { BinaryNode @@ -293,6 +264,7 @@ func joinRowIter( mode = memoryMode } + cache, dispose := ctx.Memory.NewRowsCache() if typ == rightJoin { r, err := right.RowIter(ctx) if err != nil { @@ -306,6 +278,8 @@ func joinRowIter( ctx: ctx, cond: cond, mode: mode, + secondaryRows: cache, + dispose: dispose, }), nil } @@ -314,6 +288,7 @@ func joinRowIter( span.Finish() return nil, err } + return sql.NewSpanIter(span, &joinIter{ typ: typ, primary: l, @@ -321,6 +296,8 @@ func joinRowIter( ctx: ctx, cond: cond, mode: mode, + secondaryRows: cache, + dispose: dispose, }), nil } @@ -358,14 +335,25 @@ type joinIter struct { // used to compute in-memory mode joinMode - secondaryRows []sql.Row + secondaryRows sql.RowsCache pos int + dispose sql.DisposeFunc +} + +func (i *joinIter) Dispose() { + if i.dispose != nil { + i.dispose() + i.dispose = nil + } } func (i *joinIter) loadPrimary() error { if i.primaryRow == nil { r, err := i.primary.Next() if err != nil { + if err == io.EOF { + i.Dispose() + } return err } @@ -382,52 +370,42 @@ func (i *joinIter) loadSecondaryInMemory() error { return err } - i.secondaryRows, err = sql.RowIterToRows(iter) - if err != nil { - return err + for { + row, err := iter.Next() + if err == io.EOF { + break + } + if err != nil { + return err + } + + if err := i.secondaryRows.Add(row); err != nil { + return err + } } - if len(i.secondaryRows) == 0 { + if len(i.secondaryRows.Get()) == 0 { return io.EOF } return nil } -func (i *joinIter) fitsInMemory() bool { - var maxMemory uint64 - _, v := i.ctx.Session.Get(memoryThresholdSessionVar) - if n, ok := v.(int64); ok { - maxMemory = uint64(n) * 1024 // to bytes - } else { - maxMemory = maxMemoryJoin - } - - if maxMemory <= 0 { - return true - } - - var ms runtime.MemStats - runtime.ReadMemStats(&ms) - - return (ms.HeapInuse + ms.StackInuse) < maxMemory -} - func (i *joinIter) loadSecondary() (row sql.Row, err error) { if i.mode == memoryMode { - if len(i.secondaryRows) == 0 { + if len(i.secondaryRows.Get()) == 0 { if err = i.loadSecondaryInMemory(); err != nil { return nil, err } } - if i.pos >= len(i.secondaryRows) { + if i.pos >= len(i.secondaryRows.Get()) { i.primaryRow = nil i.pos = 0 return nil, io.EOF } - row := i.secondaryRows[i.pos] + row := i.secondaryRows.Get()[i.pos] i.pos++ return row, nil } @@ -461,11 +439,20 @@ func (i *joinIter) loadSecondary() (row sql.Row, err error) { } if i.mode == unknownMode { - if !i.fitsInMemory() { + var switchToMultipass bool + if !i.ctx.Memory.HasAvailable() { + switchToMultipass = true + } else { + err := i.secondaryRows.Add(rightRow) + if err != nil && !sql.ErrNoMemoryAvailable.Is(err) { + return nil, err + } + } + + if switchToMultipass { + i.Dispose() i.secondaryRows = nil i.mode = multipassMode - } else { - i.secondaryRows = append(i.secondaryRows, rightRow) } } @@ -529,6 +516,7 @@ func (i *joinIter) buildRow(primary, secondary sql.Row) sql.Row { } func (i *joinIter) Close() (err error) { + i.Dispose() i.secondary = nil if i.primary != nil { diff --git a/sql/plan/join_test.go b/sql/plan/join_test.go index c79bc80d5..07797602b 100644 --- a/sql/plan/join_test.go +++ b/sql/plan/join_test.go @@ -1,21 +1,22 @@ package plan import ( + "context" "fmt" "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" "github.com/stretchr/testify/require" ) func TestJoinSchema(t *testing.T) { - t1 := NewResolvedTable(mem.NewTable("foo", sql.Schema{ + t1 := NewResolvedTable(memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo", Type: sql.Int64}, })) - t2 := NewResolvedTable(mem.NewTable("bar", sql.Schema{ + t2 := NewResolvedTable(memory.NewTable("bar", sql.Schema{ {Name: "b", Source: "bar", Type: sql.Int64}, })) @@ -61,8 +62,9 @@ func TestInMemoryInnerJoin(t *testing.T) { } func TestMultiPassInnerJoin(t *testing.T) { - ctx := sql.NewEmptyContext() - ctx.Set(memoryThresholdSessionVar, sql.Int64, int64(1)) + ctx := sql.NewContext(context.TODO(), sql.WithMemoryManager( + sql.NewMemoryManager(mockReporter{2, 1}), + )) testInnerJoin(t, ctx) } @@ -70,8 +72,8 @@ func testInnerJoin(t *testing.T, ctx *sql.Context) { t.Helper() require := require.New(t) - ltable := mem.NewTable("left", lSchema) - rtable := mem.NewTable("right", rSchema) + ltable := memory.NewTable("left", lSchema) + rtable := memory.NewTable("right", rSchema) insertData(t, ltable) insertData(t, rtable) @@ -95,8 +97,8 @@ func TestInnerJoinEmpty(t *testing.T) { require := require.New(t) ctx := sql.NewEmptyContext() - ltable := mem.NewTable("left", lSchema) - rtable := mem.NewTable("right", rSchema) + ltable := memory.NewTable("left", lSchema) + rtable := memory.NewTable("right", rSchema) j := NewInnerJoin( NewResolvedTable(ltable), @@ -113,12 +115,12 @@ func TestInnerJoinEmpty(t *testing.T) { } func BenchmarkInnerJoin(b *testing.B) { - t1 := mem.NewTable("foo", sql.Schema{ + t1 := memory.NewTable("foo", sql.Schema{ {Name: "a", Source: "foo", Type: sql.Int64}, {Name: "b", Source: "foo", Type: sql.Text}, }) - t2 := mem.NewTable("bar", sql.Schema{ + t2 := memory.NewTable("bar", sql.Schema{ {Name: "a", Source: "bar", Type: sql.Int64}, {Name: "b", Source: "bar", Type: sql.Text}, }) @@ -156,7 +158,9 @@ func BenchmarkInnerJoin(b *testing.B) { {int64(4), "t1_4", int64(4), "t2_4"}, } - ctx := sql.NewEmptyContext() + ctx := sql.NewContext(context.TODO(), sql.WithMemoryManager( + sql.NewMemoryManager(mockReporter{1, 5}), + )) b.Run("inner join", func(b *testing.B) { require := require.New(b) @@ -188,9 +192,7 @@ func BenchmarkInnerJoin(b *testing.B) { useInMemoryJoins = false }) - b.Run("withing memory threshold", func(b *testing.B) { - prev := maxMemoryJoin - maxMemoryJoin = 0 + b.Run("within memory threshold", func(b *testing.B) { require := require.New(b) for i := 0; i < b.N; i++ { @@ -202,8 +204,6 @@ func BenchmarkInnerJoin(b *testing.B) { require.Equal(expected, rows) } - - maxMemoryJoin = prev }) b.Run("cross join with filter", func(b *testing.B) { @@ -224,8 +224,8 @@ func BenchmarkInnerJoin(b *testing.B) { func TestLeftJoin(t *testing.T) { require := require.New(t) - ltable := mem.NewTable("left", lSchema) - rtable := mem.NewTable("right", rSchema) + ltable := memory.NewTable("left", lSchema) + rtable := memory.NewTable("right", rSchema) insertData(t, ltable) insertData(t, rtable) @@ -240,7 +240,10 @@ func TestLeftJoin(t *testing.T) { expression.NewGetField(6, sql.Text, "rcol3", false), )) - rows := collectRows(t, j) + iter, err := j.RowIter(sql.NewEmptyContext()) + require.NoError(err) + rows, err := sql.RowIterToRows(iter) + require.NoError(err) require.ElementsMatch([]sql.Row{ {"col1_1", "col2_1", int32(1), int64(2), "col1_2", "col2_2", int32(3), int64(4)}, {"col1_2", "col2_2", int32(3), int64(4), nil, nil, nil, nil}, @@ -250,8 +253,8 @@ func TestLeftJoin(t *testing.T) { func TestRightJoin(t *testing.T) { require := require.New(t) - ltable := mem.NewTable("left", lSchema) - rtable := mem.NewTable("right", rSchema) + ltable := memory.NewTable("left", lSchema) + rtable := memory.NewTable("right", rSchema) insertData(t, ltable) insertData(t, rtable) @@ -266,9 +269,20 @@ func TestRightJoin(t *testing.T) { expression.NewGetField(6, sql.Text, "rcol3", false), )) - rows := collectRows(t, j) + iter, err := j.RowIter(sql.NewEmptyContext()) + require.NoError(err) + rows, err := sql.RowIterToRows(iter) + require.NoError(err) require.ElementsMatch([]sql.Row{ {nil, nil, nil, nil, "col1_1", "col2_1", int32(1), int64(2)}, {"col1_1", "col2_1", int32(1), int64(2), "col1_2", "col2_2", int32(3), int64(4)}, }, rows) } + +type mockReporter struct { + val uint64 + max uint64 +} + +func (m mockReporter) UsedMemory() uint64 { return m.val } +func (m mockReporter) MaxMemory() uint64 { return m.max } diff --git a/sql/plan/limit_test.go b/sql/plan/limit_test.go index 3815e9c67..25fd94c21 100644 --- a/sql/plan/limit_test.go +++ b/sql/plan/limit_test.go @@ -8,11 +8,11 @@ import ( "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) -var testingTable *mem.Table +var testingTable *memory.Table var testingTableSize int func TestLimitPlan(t *testing.T) { @@ -80,7 +80,7 @@ func testLimitOverflow(t *testing.T, iter sql.RowIter, limit int, dataSize int) } } -func getTestingTable(t *testing.T) (*mem.Table, int) { +func getTestingTable(t *testing.T) (*memory.Table, int) { t.Helper() if &testingTable == nil { return testingTable, testingTableSize @@ -89,7 +89,7 @@ func getTestingTable(t *testing.T) (*mem.Table, int) { childSchema := sql.Schema{ {Name: "col1", Type: sql.Text}, } - testingTable = mem.NewTable("test", childSchema) + testingTable = memory.NewTable("test", childSchema) rows := []sql.Row{ sql.NewRow("11a"), diff --git a/sql/plan/lock_test.go b/sql/plan/lock_test.go index 6393740bd..3df237f87 100644 --- a/sql/plan/lock_test.go +++ b/sql/plan/lock_test.go @@ -4,15 +4,15 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) func TestLockTables(t *testing.T) { require := require.New(t) - t1 := newLockableTable(mem.NewTable("foo", nil)) - t2 := newLockableTable(mem.NewTable("bar", nil)) + t1 := newLockableTable(memory.NewTable("foo", nil)) + t2 := newLockableTable(memory.NewTable("bar", nil)) node := NewLockTables([]*TableLock{ {NewResolvedTable(t1), true}, {NewResolvedTable(t2), false}, @@ -31,10 +31,10 @@ func TestLockTables(t *testing.T) { func TestUnlockTables(t *testing.T) { require := require.New(t) - db := mem.NewDatabase("db") - t1 := newLockableTable(mem.NewTable("foo", nil)) - t2 := newLockableTable(mem.NewTable("bar", nil)) - t3 := newLockableTable(mem.NewTable("baz", nil)) + db := memory.NewDatabase("db") + t1 := newLockableTable(memory.NewTable("foo", nil)) + t2 := newLockableTable(memory.NewTable("bar", nil)) + t3 := newLockableTable(memory.NewTable("baz", nil)) db.AddTable("foo", t1) db.AddTable("bar", t2) db.AddTable("baz", t3) diff --git a/sql/plan/process_test.go b/sql/plan/process_test.go index 64ef90f7e..881b775a2 100644 --- a/sql/plan/process_test.go +++ b/sql/plan/process_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -13,7 +13,7 @@ import ( func TestQueryProcess(t *testing.T) { require := require.New(t) - table := mem.NewTable("foo", sql.Schema{ + table := memory.NewTable("foo", sql.Schema{ {Name: "a", Type: sql.Int64}, }) @@ -52,7 +52,7 @@ func TestQueryProcess(t *testing.T) { func TestProcessTable(t *testing.T) { require := require.New(t) - table := mem.NewPartitionedTable("foo", sql.Schema{ + table := memory.NewPartitionedTable("foo", sql.Schema{ {Name: "a", Type: sql.Int64}, }, 2) @@ -97,7 +97,7 @@ func TestProcessTable(t *testing.T) { func TestProcessIndexableTable(t *testing.T) { require := require.New(t) - table := mem.NewPartitionedTable("foo", sql.Schema{ + table := memory.NewPartitionedTable("foo", sql.Schema{ {Name: "a", Type: sql.Int64, Source: "foo"}, }, 2) diff --git a/sql/plan/project_test.go b/sql/plan/project_test.go index 2efe29fa7..84950b924 100644 --- a/sql/plan/project_test.go +++ b/sql/plan/project_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -18,7 +18,7 @@ func TestProject(t *testing.T) { {Name: "col1", Type: sql.Text, Nullable: true}, {Name: "col2", Type: sql.Text, Nullable: true}, } - child := mem.NewTable("test", childSchema) + child := memory.NewTable("test", childSchema) child.Insert(sql.NewEmptyContext(), sql.NewRow("col1_1", "col2_1")) child.Insert(sql.NewEmptyContext(), sql.NewRow("col1_2", "col2_2")) p := NewProject( diff --git a/sql/plan/show_create_table_test.go b/sql/plan/show_create_table_test.go index 3623f1395..39ac5ac31 100644 --- a/sql/plan/show_create_table_test.go +++ b/sql/plan/show_create_table_test.go @@ -4,16 +4,16 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) func TestShowCreateTable(t *testing.T) { var require = require.New(t) - db := mem.NewDatabase("testdb") + db := memory.NewDatabase("testdb") - table := mem.NewTable( + table := memory.NewTable( "test-table", sql.Schema{ &sql.Column{Name: "baz", Type: sql.Text, Default: "", Nullable: false}, diff --git a/sql/plan/show_indexes_test.go b/sql/plan/show_indexes_test.go index e8e1bb44b..fde696a08 100644 --- a/sql/plan/show_indexes_test.go +++ b/sql/plan/show_indexes_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -16,7 +16,7 @@ func TestShowIndexes(t *testing.T) { require.False(unresolved.Resolved()) require.Nil(unresolved.Children()) - db := mem.NewDatabase("test") + db := memory.NewDatabase("test") tests := []struct { name string @@ -25,7 +25,7 @@ func TestShowIndexes(t *testing.T) { }{ { name: "test1", - table: mem.NewTable( + table: memory.NewTable( "test1", sql.Schema{ &sql.Column{Name: "foo", Type: sql.Int32, Source: "test1", Default: int32(0), Nullable: false}, @@ -34,7 +34,7 @@ func TestShowIndexes(t *testing.T) { }, { name: "test2", - table: mem.NewTable( + table: memory.NewTable( "test2", sql.Schema{ &sql.Column{Name: "bar", Type: sql.Int64, Source: "test2", Default: int64(0), Nullable: true}, @@ -44,7 +44,7 @@ func TestShowIndexes(t *testing.T) { }, { name: "test3", - table: mem.NewTable( + table: memory.NewTable( "test3", sql.Schema{ &sql.Column{Name: "baz", Type: sql.Text, Source: "test3", Default: "", Nullable: false}, @@ -55,7 +55,7 @@ func TestShowIndexes(t *testing.T) { }, { name: "test4", - table: mem.NewTable( + table: memory.NewTable( "test4", sql.Schema{ &sql.Column{Name: "oof", Type: sql.Text, Source: "test4", Default: "", Nullable: false}, diff --git a/sql/plan/show_tables_test.go b/sql/plan/show_tables_test.go index 349cac646..b9a48f562 100644 --- a/sql/plan/show_tables_test.go +++ b/sql/plan/show_tables_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) @@ -18,10 +18,10 @@ func TestShowTables(t *testing.T) { require.False(unresolvedShowTables.Resolved()) require.Nil(unresolvedShowTables.Children()) - db := mem.NewDatabase("test") - db.AddTable("test1", mem.NewTable("test1", nil)) - db.AddTable("test2", mem.NewTable("test2", nil)) - db.AddTable("test3", mem.NewTable("test3", nil)) + db := memory.NewDatabase("test") + db.AddTable("test1", memory.NewTable("test1", nil)) + db.AddTable("test2", memory.NewTable("test2", nil)) + db.AddTable("test3", memory.NewTable("test3", nil)) resolvedShowTables := NewShowTables(db, false) require.True(resolvedShowTables.Resolved()) diff --git a/sql/plan/showcolumns_test.go b/sql/plan/showcolumns_test.go index f5d794fe7..acb245a37 100644 --- a/sql/plan/showcolumns_test.go +++ b/sql/plan/showcolumns_test.go @@ -4,14 +4,14 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) func TestShowColumns(t *testing.T) { require := require.New(t) - table := NewResolvedTable(mem.NewTable("foo", sql.Schema{ + table := NewResolvedTable(memory.NewTable("foo", sql.Schema{ {Name: "a", Type: sql.Text}, {Name: "b", Type: sql.Int64, Nullable: true}, {Name: "c", Type: sql.Int64, Default: int64(1)}, @@ -34,7 +34,7 @@ func TestShowColumns(t *testing.T) { func TestShowColumnsFull(t *testing.T) { require := require.New(t) - table := NewResolvedTable(mem.NewTable("foo", sql.Schema{ + table := NewResolvedTable(memory.NewTable("foo", sql.Schema{ {Name: "a", Type: sql.Text}, {Name: "b", Type: sql.Int64, Nullable: true}, {Name: "c", Type: sql.Int64, Default: int64(1)}, diff --git a/sql/plan/showtablestatus_test.go b/sql/plan/showtablestatus_test.go index 46fbe7c86..83bc4d863 100644 --- a/sql/plan/showtablestatus_test.go +++ b/sql/plan/showtablestatus_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) @@ -13,14 +13,14 @@ func TestShowTableStatus(t *testing.T) { catalog := sql.NewCatalog() - db1 := mem.NewDatabase("a") - db1.AddTable("t1", mem.NewTable("t1", nil)) - db1.AddTable("t2", mem.NewTable("t2", nil)) + db1 := memory.NewDatabase("a") + db1.AddTable("t1", memory.NewTable("t1", nil)) + db1.AddTable("t2", memory.NewTable("t2", nil)) catalog.AddDatabase(db1) - db2 := mem.NewDatabase("b") - db2.AddTable("t3", mem.NewTable("t3", nil)) - db2.AddTable("t4", mem.NewTable("t4", nil)) + db2 := memory.NewDatabase("b") + db2.AddTable("t3", memory.NewTable("t3", nil)) + db2.AddTable("t4", memory.NewTable("t4", nil)) catalog.AddDatabase(db2) node := NewShowTableStatus() diff --git a/sql/plan/sort.go b/sql/plan/sort.go index 8235b2386..fed1f7da3 100644 --- a/sql/plan/sort.go +++ b/sql/plan/sort.go @@ -88,7 +88,7 @@ func (s *Sort) RowIter(ctx *sql.Context) (sql.RowIter, error) { span.Finish() return nil, err } - return sql.NewSpanIter(span, newSortIter(s, i)), nil + return sql.NewSpanIter(span, newSortIter(ctx, s, i)), nil } func (s *Sort) String() string { @@ -139,18 +139,19 @@ func (s *Sort) WithExpressions(exprs ...sql.Expression) (sql.Node, error) { } type sortIter struct { + ctx *sql.Context s *Sort childIter sql.RowIter sortedRows []sql.Row idx int } -func newSortIter(s *Sort, child sql.RowIter) *sortIter { +func newSortIter(ctx *sql.Context, s *Sort, child sql.RowIter) *sortIter { return &sortIter{ - s: s, - childIter: child, - sortedRows: nil, - idx: -1, + ctx: ctx, + s: s, + childIter: child, + idx: -1, } } @@ -162,6 +163,7 @@ func (i *sortIter) Next() (sql.Row, error) { } i.idx = 0 } + if i.idx >= len(i.sortedRows) { return nil, io.EOF } @@ -176,9 +178,11 @@ func (i *sortIter) Close() error { } func (i *sortIter) computeSortedRows() error { - var rows []sql.Row + cache, dispose := i.ctx.Memory.NewRowsCache() + defer dispose() + for { - childRow, err := i.childIter.Next() + row, err := i.childIter.Next() if err == io.EOF { break } @@ -186,9 +190,12 @@ func (i *sortIter) computeSortedRows() error { return err } - rows = append(rows, childRow) + if err := cache.Add(row); err != nil { + return err + } } + rows := cache.Get() sorter := &sorter{ sortFields: i.s.SortFields, rows: rows, diff --git a/sql/plan/sort_test.go b/sql/plan/sort_test.go index 202668cca..d5cb51ad8 100644 --- a/sql/plan/sort_test.go +++ b/sql/plan/sort_test.go @@ -3,7 +3,7 @@ package plan import ( "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" @@ -27,7 +27,7 @@ func TestSort(t *testing.T) { {Name: "col2", Type: sql.Int32, Nullable: true}, } - child := mem.NewTable("test", schema) + child := memory.NewTable("test", schema) for _, row := range data { require.NoError(child.Insert(sql.NewEmptyContext(), row)) } @@ -68,7 +68,7 @@ func TestSortAscending(t *testing.T) { {Name: "col1", Type: sql.Text, Nullable: true}, } - child := mem.NewTable("test", schema) + child := memory.NewTable("test", schema) for _, row := range data { require.NoError(child.Insert(sql.NewEmptyContext(), row)) } @@ -108,7 +108,7 @@ func TestSortDescending(t *testing.T) { {Name: "col1", Type: sql.Text, Nullable: true}, } - child := mem.NewTable("test", schema) + child := memory.NewTable("test", schema) for _, row := range data { require.NoError(child.Insert(sql.NewEmptyContext(), row)) } diff --git a/sql/plan/subqueryalias_test.go b/sql/plan/subqueryalias_test.go index 94c045e70..ad5c23de6 100644 --- a/sql/plan/subqueryalias_test.go +++ b/sql/plan/subqueryalias_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" ) @@ -22,7 +22,7 @@ func TestSubqueryAliasSchema(t *testing.T) { {Name: "baz", Type: sql.Text, Nullable: false, Source: "alias"}, } - table := mem.NewTable("bar", tableSchema) + table := memory.NewTable("bar", tableSchema) subquery := NewProject( []sql.Expression{ diff --git a/sql/plan/tablealias_test.go b/sql/plan/tablealias_test.go index 9e5faf859..cd7253472 100644 --- a/sql/plan/tablealias_test.go +++ b/sql/plan/tablealias_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" ) @@ -13,7 +13,7 @@ func TestTableAlias(t *testing.T) { require := require.New(t) ctx := sql.NewEmptyContext() - table := mem.NewTable("bar", sql.Schema{ + table := memory.NewTable("bar", sql.Schema{ {Name: "a", Type: sql.Text, Nullable: true}, {Name: "b", Type: sql.Text, Nullable: true}, }) diff --git a/sql/plan/transform_test.go b/sql/plan/transform_test.go index 730ab128f..38d1bb0fb 100644 --- a/sql/plan/transform_test.go +++ b/sql/plan/transform_test.go @@ -3,7 +3,7 @@ package plan import ( "testing" - "github.com/src-d/go-mysql-server/mem" + "github.com/src-d/go-mysql-server/memory" "github.com/src-d/go-mysql-server/sql" "github.com/src-d/go-mysql-server/sql/expression" @@ -22,7 +22,7 @@ func TestTransformUp(t *testing.T) { {Name: "a", Type: sql.Text}, {Name: "b", Type: sql.Text}, } - table := mem.NewTable("resolved", schema) + table := memory.NewTable("resolved", schema) pt, err := TransformUp(p, func(n sql.Node) (sql.Node, error) { switch n.(type) { diff --git a/sql/session.go b/sql/session.go index 3c9aa2f6f..8123ed51e 100644 --- a/sql/session.go +++ b/sql/session.go @@ -211,6 +211,7 @@ func NewBaseSession() Session { type Context struct { context.Context Session + Memory *MemoryManager pid uint64 query string tracer opentracing.Tracer @@ -240,25 +241,37 @@ func WithPid(pid uint64) ContextOption { } } -// WithQuery add the given query to the context. +// WithQuery adds the given query to the context. func WithQuery(q string) ContextOption { return func(ctx *Context) { ctx.query = q } } +// WithMemoryManager adds the given memory manager to the context. +func WithMemoryManager(m *MemoryManager) ContextOption { + return func(ctx *Context) { + ctx.Memory = m + } +} + // NewContext creates a new query context. Options can be passed to configure // the context. If some aspect of the context is not configure, the default // value will be used. -// By default, the context will have an empty base session and a noop tracer. +// By default, the context will have an empty base session, a noop tracer and +// a memory manager using the process reporter. func NewContext( ctx context.Context, opts ...ContextOption, ) *Context { - c := &Context{ctx, NewBaseSession(), 0, "", opentracing.NoopTracer{}} + c := &Context{ctx, NewBaseSession(), nil, 0, "", opentracing.NoopTracer{}} for _, opt := range opts { opt(c) } + + if c.Memory == nil { + c.Memory = NewMemoryManager(ProcessMemory) + } return c } @@ -285,12 +298,12 @@ func (c *Context) Span( span := c.tracer.StartSpan(opName, opts...) ctx := opentracing.ContextWithSpan(c.Context, span) - return span, &Context{ctx, c.Session, c.Pid(), c.Query(), c.tracer} + return span, &Context{ctx, c.Session, c.Memory, c.Pid(), c.Query(), c.tracer} } // WithContext returns a new context with the given underlying context. func (c *Context) WithContext(ctx context.Context) *Context { - return &Context{ctx, c.Session, c.Pid(), c.Query(), c.tracer} + return &Context{ctx, c.Session, c.Memory, c.Pid(), c.Query(), c.tracer} } // Error adds an error as warning to the session.