Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
sql: implement memory management system for caches
Browse files Browse the repository at this point in the history
This PR implements a memory management system, intended to have
control over the allocated memory for caches so that they can be
freed at any moment and we can avoid out of memory errors.

The main changes are the following:

- MemoryManager in the sql package, which is just the component that
  tracks all caches. Memory of all freeable caches can be freed
  using the Free method of this component. The only way to instantiate
  new caches is using the NewXXXCache methods.
- Rows, history and LRU cache implementations, accessed using the
  NewXXXCache methods of MemoryManager.
- Reporters, which is a component that reports the maximum amount
  of memory the program is allowed to use and the currently used memory.
  This interface is meant for making testing easier. There is a default
  ProcessMemory reporter that returns the memory used by the process and
  the maximum memory defined in the `MAX_MEMORY` environment variable.
- MemoryManager is passed down to every component through *sql.Context,
  which meant a little more boilerplate on the server SessionBuilder.
- GroupBy, Sort, Distinct and Join now use the provided APIs of memory
  and cache management for their in-memory computations.

Caveats:
- We need to think of a good default so that memory usage won't grow
  forever and crash eventually, which is the behaviour when MAX_MEMORY is 0.

Signed-off-by: Miguel Molina <miguel@erizocosmi.co>
  • Loading branch information
erizocosmico committed Aug 12, 2019
1 parent 875590d commit 95d9aae
Show file tree
Hide file tree
Showing 69 changed files with 1,061 additions and 434 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ SET <variable name> = <value>
|:-----|:-----|:------------|
|`INMEMORY_JOINS`|environment|If set it will perform all joins in memory. Default is off.|
|`inmemory_joins`|session|If set it will perform all joins in memory. Default is off. This has precedence over `INMEMORY_JOINS`.|
|`MAX_MEMORY_JOIN`|environment|The maximum number of memory, in megabytes, that can be consumed by go-mysql-server before switching to multipass mode in joins. Default is the 20% of all available physical memory.|
|`max_memory_joins`|session|The maximum number of memory, in megabytes, that can be consumed by go-mysql-server before switching to multipass mode in joins. Default is the 20% of all available physical memory. This has precedence over `MAX_MEMORY_JOIN`.|
|`MAX_MEMORY`|environment|The maximum number of memory, in megabytes, that can be consumed by go-mysql-server. Any in-memory caches or computations will no longer try to use memory when the limit is reached. Note that this may cause certain queries to fail if there is not enough memory available, such as queries using DISTINCT, ORDER BY or GROUP BY with groupings.|
|`DEBUG_ANALYZER`|environment|If set, the analyzer will print debug messages. Default is off.|
|`PILOSA_INDEX_THREADS`|environment|Number of threads used in index creation. Default is the number of cores available in the machine.|
|`pilosa_index_threads`|environment|Number of threads used in index creation. Default is the number of cores available in the machine. This has precedence over `PILOSA_INDEX_THREADS`.|
Expand Down Expand Up @@ -176,14 +175,14 @@ func main() {
s.Start()
}

func createTestDatabase() *mem.Database {
func createTestDatabase() *memory.Database {
const (
dbName = "test"
tableName = "mytable"
)

db := mem.NewDatabase(dbName)
table := mem.NewTable(tableName, sql.Schema{
db := memory.NewDatabase(dbName)
table := memory.NewTable(tableName, sql.Schema{
{Name: "name", Type: sql.Text, Nullable: false, Source: tableName},
{Name: "email", Type: sql.Text, Nullable: false, Source: tableName},
{Name: "phone_numbers", Type: sql.JSON, Nullable: false, Source: tableName},
Expand Down
1 change: 0 additions & 1 deletion _example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

sqle "github.com/src-d/go-mysql-server"
"github.com/src-d/go-mysql-server/auth"
"github.com/src-d/go-mysql-server/mem"
"github.com/src-d/go-mysql-server/server"
"github.com/src-d/go-mysql-server/sql"
)
Expand Down
6 changes: 3 additions & 3 deletions auth/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require"
sqle "github.com/src-d/go-mysql-server"
"github.com/src-d/go-mysql-server/auth"
"github.com/src-d/go-mysql-server/mem"
"github.com/src-d/go-mysql-server/memory"
"github.com/src-d/go-mysql-server/server"
"github.com/src-d/go-mysql-server/sql"
"github.com/src-d/go-mysql-server/sql/analyzer"
Expand All @@ -21,13 +21,13 @@ import (
const port = 3336

func authEngine(au auth.Auth) (string, *sqle.Engine, error) {
db := mem.NewDatabase("test")
db := memory.NewDatabase("test")
catalog := sql.NewCatalog()
catalog.AddDatabase(db)

tblName := "test"

table := mem.NewTable(tblName, sql.Schema{
table := memory.NewTable(tblName, sql.Schema{
{Name: "id", Type: sql.Text, Nullable: false, Source: tblName},
{Name: "name", Type: sql.Text, Nullable: false, Source: tblName},
})
Expand Down
10 changes: 5 additions & 5 deletions benchmark/tpc_h_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"path/filepath"
"testing"

"github.com/src-d/go-mysql-server"
sqle "github.com/src-d/go-mysql-server"

"github.com/src-d/go-mysql-server/mem"
"github.com/src-d/go-mysql-server/memory"
"github.com/src-d/go-mysql-server/sql"
)

Expand Down Expand Up @@ -83,11 +83,11 @@ func executeQueries(b *testing.B, e *sqle.Engine) error {
}

func genDB(b *testing.B) (sql.Database, error) {
db := mem.NewDatabase("tpch")
db := memory.NewDatabase("tpch")

for _, m := range tpchTableMetadata {
b.Log("generating table", m.name)
t := mem.NewTable(m.name, m.schema)
t := memory.NewTable(m.name, m.schema)
if err := insertDataToTable(m.name, t, len(m.schema)); err != nil {
return nil, err
}
Expand All @@ -98,7 +98,7 @@ func genDB(b *testing.B) (sql.Database, error) {
return db, nil
}

func insertDataToTable(name string, t *mem.Table, columnCount int) error {
func insertDataToTable(name string, t *memory.Table, columnCount int) error {
f, err := os.Open(name + ".tbl")
if err != nil {
return err
Expand Down
72 changes: 36 additions & 36 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

sqle "github.com/src-d/go-mysql-server"
"github.com/src-d/go-mysql-server/auth"
"github.com/src-d/go-mysql-server/mem"
"github.com/src-d/go-mysql-server/memory"
"github.com/src-d/go-mysql-server/sql"
"github.com/src-d/go-mysql-server/sql/analyzer"
"github.com/src-d/go-mysql-server/sql/index/pilosa"
Expand Down Expand Up @@ -1775,7 +1775,7 @@ const testNumPartitions = 5
func TestAmbiguousColumnResolution(t *testing.T) {
require := require.New(t)

table := mem.NewPartitionedTable("foo", sql.Schema{
table := memory.NewPartitionedTable("foo", sql.Schema{
{Name: "a", Type: sql.Int64, Source: "foo"},
{Name: "b", Type: sql.Text, Source: "foo"},
}, testNumPartitions)
Expand All @@ -1787,7 +1787,7 @@ func TestAmbiguousColumnResolution(t *testing.T) {
sql.NewRow(int64(3), "baz"),
)

table2 := mem.NewPartitionedTable("bar", sql.Schema{
table2 := memory.NewPartitionedTable("bar", sql.Schema{
{Name: "b", Type: sql.Text, Source: "bar"},
{Name: "c", Type: sql.Int64, Source: "bar"},
}, testNumPartitions)
Expand All @@ -1798,7 +1798,7 @@ func TestAmbiguousColumnResolution(t *testing.T) {
sql.NewRow("pux", int64(1)),
)

db := mem.NewDatabase("mydb")
db := memory.NewDatabase("mydb")
db.AddTable("foo", table)
db.AddTable("bar", table2)

Expand Down Expand Up @@ -1865,7 +1865,7 @@ func TestDDL(t *testing.T) {
func TestNaturalJoin(t *testing.T) {
require := require.New(t)

t1 := mem.NewPartitionedTable("t1", sql.Schema{
t1 := memory.NewPartitionedTable("t1", sql.Schema{
{Name: "a", Type: sql.Text, Source: "t1"},
{Name: "b", Type: sql.Text, Source: "t1"},
{Name: "c", Type: sql.Text, Source: "t1"},
Expand All @@ -1878,7 +1878,7 @@ func TestNaturalJoin(t *testing.T) {
sql.NewRow("a_3", "b_3", "c_3"),
)

t2 := mem.NewPartitionedTable("t2", sql.Schema{
t2 := memory.NewPartitionedTable("t2", sql.Schema{
{Name: "a", Type: sql.Text, Source: "t2"},
{Name: "b", Type: sql.Text, Source: "t2"},
{Name: "d", Type: sql.Text, Source: "t2"},
Expand All @@ -1891,7 +1891,7 @@ func TestNaturalJoin(t *testing.T) {
sql.NewRow("a_3", "b_3", "d_3"),
)

db := mem.NewDatabase("mydb")
db := memory.NewDatabase("mydb")
db.AddTable("t1", t1)
db.AddTable("t2", t2)

Expand All @@ -1917,7 +1917,7 @@ func TestNaturalJoin(t *testing.T) {
func TestNaturalJoinEqual(t *testing.T) {
require := require.New(t)

t1 := mem.NewPartitionedTable("t1", sql.Schema{
t1 := memory.NewPartitionedTable("t1", sql.Schema{
{Name: "a", Type: sql.Text, Source: "t1"},
{Name: "b", Type: sql.Text, Source: "t1"},
{Name: "c", Type: sql.Text, Source: "t1"},
Expand All @@ -1930,7 +1930,7 @@ func TestNaturalJoinEqual(t *testing.T) {
sql.NewRow("a_3", "b_3", "c_3"),
)

t2 := mem.NewPartitionedTable("t2", sql.Schema{
t2 := memory.NewPartitionedTable("t2", sql.Schema{
{Name: "a", Type: sql.Text, Source: "t2"},
{Name: "b", Type: sql.Text, Source: "t2"},
{Name: "c", Type: sql.Text, Source: "t2"},
Expand All @@ -1943,7 +1943,7 @@ func TestNaturalJoinEqual(t *testing.T) {
sql.NewRow("a_3", "b_3", "c_3"),
)

db := mem.NewDatabase("mydb")
db := memory.NewDatabase("mydb")
db.AddTable("t1", t1)
db.AddTable("t2", t2)

Expand All @@ -1969,7 +1969,7 @@ func TestNaturalJoinEqual(t *testing.T) {
func TestNaturalJoinDisjoint(t *testing.T) {
require := require.New(t)

t1 := mem.NewPartitionedTable("t1", sql.Schema{
t1 := memory.NewPartitionedTable("t1", sql.Schema{
{Name: "a", Type: sql.Text, Source: "t1"},
}, testNumPartitions)

Expand All @@ -1980,7 +1980,7 @@ func TestNaturalJoinDisjoint(t *testing.T) {
sql.NewRow("a3"),
)

t2 := mem.NewPartitionedTable("t2", sql.Schema{
t2 := memory.NewPartitionedTable("t2", sql.Schema{
{Name: "b", Type: sql.Text, Source: "t2"},
}, testNumPartitions)
insertRows(
Expand All @@ -1990,7 +1990,7 @@ func TestNaturalJoinDisjoint(t *testing.T) {
sql.NewRow("b3"),
)

db := mem.NewDatabase("mydb")
db := memory.NewDatabase("mydb")
db.AddTable("t1", t1)
db.AddTable("t2", t2)

Expand Down Expand Up @@ -2022,7 +2022,7 @@ func TestNaturalJoinDisjoint(t *testing.T) {
func TestInnerNestedInNaturalJoins(t *testing.T) {
require := require.New(t)

table1 := mem.NewPartitionedTable("table1", sql.Schema{
table1 := memory.NewPartitionedTable("table1", sql.Schema{
{Name: "i", Type: sql.Int32, Source: "table1"},
{Name: "f", Type: sql.Float64, Source: "table1"},
{Name: "t", Type: sql.Text, Source: "table1"},
Expand All @@ -2035,7 +2035,7 @@ func TestInnerNestedInNaturalJoins(t *testing.T) {
sql.NewRow(int32(10), float64(2.1), "table1"),
)

table2 := mem.NewPartitionedTable("table2", sql.Schema{
table2 := memory.NewPartitionedTable("table2", sql.Schema{
{Name: "i2", Type: sql.Int32, Source: "table2"},
{Name: "f2", Type: sql.Float64, Source: "table2"},
{Name: "t2", Type: sql.Text, Source: "table2"},
Expand All @@ -2048,7 +2048,7 @@ func TestInnerNestedInNaturalJoins(t *testing.T) {
sql.NewRow(int32(20), float64(2.2), "table2"),
)

table3 := mem.NewPartitionedTable("table3", sql.Schema{
table3 := memory.NewPartitionedTable("table3", sql.Schema{
{Name: "i", Type: sql.Int32, Source: "table3"},
{Name: "f2", Type: sql.Float64, Source: "table3"},
{Name: "t3", Type: sql.Text, Source: "table3"},
Expand All @@ -2061,7 +2061,7 @@ func TestInnerNestedInNaturalJoins(t *testing.T) {
sql.NewRow(int32(30), float64(2.2), "table3"),
)

db := mem.NewDatabase("mydb")
db := memory.NewDatabase("mydb")
db.AddTable("table1", table1)
db.AddTable("table2", table2)
db.AddTable("table3", table3)
Expand Down Expand Up @@ -2114,7 +2114,7 @@ func newEngine(t *testing.T) *sqle.Engine {
}

func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
table := mem.NewPartitionedTable("mytable", sql.Schema{
table := memory.NewPartitionedTable("mytable", sql.Schema{
{Name: "i", Type: sql.Int64, Source: "mytable"},
{Name: "s", Type: sql.Text, Source: "mytable"},
}, testNumPartitions)
Expand All @@ -2126,7 +2126,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
sql.NewRow(int64(3), "third row"),
)

table2 := mem.NewPartitionedTable("othertable", sql.Schema{
table2 := memory.NewPartitionedTable("othertable", sql.Schema{
{Name: "s2", Type: sql.Text, Source: "othertable"},
{Name: "i2", Type: sql.Int64, Source: "othertable"},
}, testNumPartitions)
Expand All @@ -2138,7 +2138,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
sql.NewRow("third", int64(1)),
)

table3 := mem.NewPartitionedTable("tabletest", sql.Schema{
table3 := memory.NewPartitionedTable("tabletest", sql.Schema{
{Name: "i", Type: sql.Int32, Source: "tabletest"},
{Name: "s", Type: sql.Text, Source: "tabletest"},
}, testNumPartitions)
Expand All @@ -2150,7 +2150,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
sql.NewRow(int64(3), "third row"),
)

table4 := mem.NewPartitionedTable("other_table", sql.Schema{
table4 := memory.NewPartitionedTable("other_table", sql.Schema{
{Name: "text", Type: sql.Text, Source: "tabletest"},
{Name: "number", Type: sql.Int32, Source: "tabletest"},
}, testNumPartitions)
Expand All @@ -2162,7 +2162,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
sql.NewRow("c", int32(0)),
)

bigtable := mem.NewPartitionedTable("bigtable", sql.Schema{
bigtable := memory.NewPartitionedTable("bigtable", sql.Schema{
{Name: "t", Type: sql.Text, Source: "bigtable"},
{Name: "n", Type: sql.Int64, Source: "bigtable"},
}, testNumPartitions)
Expand All @@ -2185,7 +2185,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
sql.NewRow("b", int64(9)),
)

floatTable := mem.NewPartitionedTable("floattable", sql.Schema{
floatTable := memory.NewPartitionedTable("floattable", sql.Schema{
{Name: "i", Type: sql.Int64, Source: "floattable"},
{Name: "f32", Type: sql.Float32, Source: "floattable"},
{Name: "f64", Type: sql.Float64, Source: "floattable"},
Expand All @@ -2201,7 +2201,7 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
sql.NewRow(-2, float32(-1.5), float64(-1.5)),
)

nilTable := mem.NewPartitionedTable("niltable", sql.Schema{
nilTable := memory.NewPartitionedTable("niltable", sql.Schema{
{Name: "i", Type: sql.Int64, Source: "niltable", Nullable: true},
{Name: "b", Type: sql.Boolean, Source: "niltable", Nullable: true},
{Name: "f", Type: sql.Float64, Source: "niltable", Nullable: true},
Expand All @@ -2216,15 +2216,15 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
sql.NewRow(nil, nil, nil),
)

db := mem.NewDatabase("mydb")
db := memory.NewDatabase("mydb")
db.AddTable("mytable", table)
db.AddTable("othertable", table2)
db.AddTable("tabletest", table3)
db.AddTable("bigtable", bigtable)
db.AddTable("floattable", floatTable)
db.AddTable("niltable", nilTable)

db2 := mem.NewDatabase("foo")
db2 := memory.NewDatabase("foo")
db2.AddTable("other_table", table4)

catalog := sql.NewCatalog()
Expand Down Expand Up @@ -2488,7 +2488,7 @@ func TestCreateIndex(t *testing.T) {
func TestOrderByGroupBy(t *testing.T) {
require := require.New(t)

table := mem.NewPartitionedTable("members", sql.Schema{
table := memory.NewPartitionedTable("members", sql.Schema{
{Name: "id", Type: sql.Int64, Source: "members"},
{Name: "team", Type: sql.Text, Source: "members"},
}, testNumPartitions)
Expand All @@ -2503,7 +2503,7 @@ func TestOrderByGroupBy(t *testing.T) {
sql.NewRow(int64(8), "purple"),
)

db := mem.NewDatabase("db")
db := memory.NewDatabase("db")
db.AddTable("members", table)

e := sqle.NewDefault()
Expand Down Expand Up @@ -2583,12 +2583,12 @@ func TestTracing(t *testing.T) {
func TestReadOnly(t *testing.T) {
require := require.New(t)

table := mem.NewPartitionedTable("mytable", sql.Schema{
table := memory.NewPartitionedTable("mytable", sql.Schema{
{Name: "i", Type: sql.Int64, Source: "mytable"},
{Name: "s", Type: sql.Text, Source: "mytable"},
}, testNumPartitions)

db := mem.NewDatabase("mydb")
db := memory.NewDatabase("mydb")
db.AddTable("mytable", table)

catalog := sql.NewCatalog()
Expand Down Expand Up @@ -2689,11 +2689,11 @@ func TestUse(t *testing.T) {
func TestLocks(t *testing.T) {
require := require.New(t)

t1 := newLockableTable(mem.NewTable("t1", nil))
t2 := newLockableTable(mem.NewTable("t2", nil))
t3 := mem.NewTable("t3", nil)
t1 := newLockableTable(memory.NewTable("t1", nil))
t2 := newLockableTable(memory.NewTable("t2", nil))
t3 := memory.NewTable("t3", nil)
catalog := sql.NewCatalog()
db := mem.NewDatabase("db")
db := memory.NewDatabase("db")
db.AddTable("t1", t1)
db.AddTable("t2", t2)
db.AddTable("t3", t3)
Expand Down Expand Up @@ -2789,7 +2789,7 @@ var generatorQueries = []struct {
}

func TestGenerators(t *testing.T) {
table := mem.NewPartitionedTable("t", sql.Schema{
table := memory.NewPartitionedTable("t", sql.Schema{
{Name: "a", Type: sql.Int64, Source: "t"},
{Name: "b", Type: sql.Array(sql.Text), Source: "t"},
{Name: "c", Type: sql.Text, Source: "t"},
Expand All @@ -2802,7 +2802,7 @@ func TestGenerators(t *testing.T) {
sql.NewRow(int64(3), []interface{}{"e", "f"}, "third"),
)

db := mem.NewDatabase("db")
db := memory.NewDatabase("db")
db.AddTable("t", table)

catalog := sql.NewCatalog()
Expand Down
Loading

0 comments on commit 95d9aae

Please sign in to comment.