Navigation Menu

Skip to content

Commit

Permalink
Store ctid as column
Browse files Browse the repository at this point in the history
pgroonga.score() is disabled for now because the current
pgroonga.score() implementation requires ctid as key.
  • Loading branch information
kou committed Mar 31, 2015
1 parent 2e3292d commit 838b971
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 53 deletions.
148 changes: 95 additions & 53 deletions pgroonga.c
Expand Up @@ -44,6 +44,7 @@ typedef struct PGrnCreateData
{
Relation index;
grn_obj *sourcesTable;
grn_obj *sourcesCtidColumn;
grn_obj *lexicons;
unsigned int i;
TupleDesc desc;
Expand All @@ -56,6 +57,7 @@ typedef struct PGrnCreateData
typedef struct PGrnBuildStateData
{
grn_obj *sourcesTable;
grn_obj *sourcesCtidColumn;
double nIndexedTuples;
} PGrnBuildStateData;

Expand All @@ -66,14 +68,15 @@ typedef struct PGrnScanOpaqueData
slist_node node;
Oid dataTableID;
grn_obj *sourcesTable;
grn_obj *sourcesCtidColumn;
grn_obj minBorderValue;
grn_obj maxBorderValue;
grn_obj *searched;
grn_obj *sorted;
grn_obj *targetTable;
grn_obj *indexCursor;
grn_table_cursor *tableCursor;
grn_obj *keyAccessor;
grn_obj *ctidAccessor;
grn_obj *scoreAccessor;
grn_id currentID;
} PGrnScanOpaqueData;
Expand Down Expand Up @@ -118,6 +121,7 @@ PG_FUNCTION_INFO_V1(pgroonga_options);
static grn_ctx grnContext;
static grn_ctx *ctx = &grnContext;
static grn_obj buffer;
static grn_obj ctidBuffer;
static grn_obj headBuffer;
static grn_obj bodyBuffer;
static grn_obj footBuffer;
Expand Down Expand Up @@ -192,6 +196,7 @@ PGrnOnProcExit(int code, Datum arg)
GRN_OBJ_FIN(ctx, &footBuffer);
GRN_OBJ_FIN(ctx, &bodyBuffer);
GRN_OBJ_FIN(ctx, &headBuffer);
GRN_OBJ_FIN(ctx, &ctidBuffer);
GRN_OBJ_FIN(ctx, &buffer);

db = grn_ctx_db(ctx);
Expand Down Expand Up @@ -325,6 +330,7 @@ _PG_init(void)
on_proc_exit(PGrnOnProcExit, 0);

GRN_VOID_INIT(&buffer);
GRN_UINT64_INIT(&ctidBuffer, 0);
GRN_TEXT_INIT(&headBuffer, 0);
GRN_TEXT_INIT(&bodyBuffer, 0);
GRN_TEXT_INIT(&footBuffer, 0);
Expand Down Expand Up @@ -504,6 +510,17 @@ PGrnLookupSourcesTable(Relation index, int errorLevel)
return PGrnLookup(name, errorLevel);
}

static grn_obj *
PGrnLookupSourcesCtidColumn(Relation index, int errorLevel)
{
char name[GRN_TABLE_MAX_KEY_SIZE];

snprintf(name, sizeof(name),
PGrnSourcesTableNameFormat "." PGrnSourcesCtidColumnName,
index->rd_node.relNode);
return PGrnLookup(name, errorLevel);
}

static grn_obj *
PGrnLookupIndexColumn(Relation index, unsigned int nthAttribute, int errorLevel)
{
Expand Down Expand Up @@ -578,6 +595,29 @@ PGrnIsForFullTextSearchIndex(Relation index, int nthAttribute)
return OidIsValid(queryStrategyOID);
}

static void
PGrnCreateSourcesCtidColumn(PGrnCreateData *data)
{
data->sourcesCtidColumn = PGrnCreateColumn(data->sourcesTable,
PGrnSourcesCtidColumnName,
GRN_OBJ_COLUMN_SCALAR,
grn_ctx_at(ctx, GRN_DB_UINT64));
}

static void
PGrnCreateSourcesTable(PGrnCreateData *data)
{
char sourcesTableName[GRN_TABLE_MAX_KEY_SIZE];

snprintf(sourcesTableName, sizeof(sourcesTableName),
PGrnSourcesTableNameFormat, data->relNode);
data->sourcesTable = PGrnCreateTable(sourcesTableName,
GRN_OBJ_TABLE_NO_KEY,
NULL);

PGrnCreateSourcesCtidColumn(data);
}

static void
PGrnCreateDataColumn(PGrnCreateData *data)
{
Expand Down Expand Up @@ -677,23 +717,22 @@ PGrnCreateIndexColumn(PGrnCreateData *data)
* @param index
*/
static void
PGrnCreate(Relation index, grn_obj **sourcesTable, grn_obj *lexicons)
PGrnCreate(Relation index,
grn_obj **sourcesTable,
grn_obj **sourcesCtidColumn,
grn_obj *lexicons)
{
char sourcesTableName[GRN_TABLE_MAX_KEY_SIZE];
PGrnCreateData data;

data.index = index;
data.desc = RelationGetDescr(index);
data.relNode = index->rd_node.relNode;

snprintf(sourcesTableName, sizeof(sourcesTableName),
PGrnSourcesTableNameFormat, data.relNode);
*sourcesTable = PGrnCreateTable(sourcesTableName,
GRN_OBJ_TABLE_PAT_KEY,
grn_ctx_at(ctx, GRN_DB_UINT64));
data.sourcesTable = *sourcesTable;
data.lexicons = lexicons;

PGrnCreateSourcesTable(&data);
*sourcesTable = data.sourcesTable;
*sourcesCtidColumn = data.sourcesCtidColumn;

for (data.i = 0; data.i < data.desc->natts; data.i++)
{
data.forFullTextSearch = PGrnIsForFullTextSearchIndex(index, data.i);
Expand Down Expand Up @@ -734,7 +773,7 @@ PGrnSetSources(Relation index, grn_obj *sourcesTable)
GRN_OBJ_FIN(ctx, &sourceIDs);
}

static grn_id
static uint64
CtidToUInt64(ItemPointer ctid)
{
BlockNumber blockNumber;
Expand Down Expand Up @@ -1087,16 +1126,18 @@ static void
PGrnInsert(grn_ctx *ctx,
Relation index,
grn_obj *sourcesTable,
grn_obj *sourcesCtidColumn,
Datum *values,
bool *isnull,
ItemPointer ht_ctid)
{
TupleDesc desc = RelationGetDescr(index);
uint64 key = CtidToUInt64(ht_ctid);
grn_id id;
int i;

id = grn_table_add(ctx, sourcesTable, &key, sizeof(uint64), NULL);
id = grn_table_add(ctx, sourcesTable, NULL, 0, NULL);
GRN_UINT64_SET(ctx, &ctidBuffer, CtidToUInt64(ht_ctid));
grn_obj_set_value(ctx, sourcesCtidColumn, id, &ctidBuffer, GRN_OBJ_SET);

for (i = 0; i < desc->natts; i++)
{
Expand Down Expand Up @@ -1136,9 +1177,12 @@ pgroonga_insert(PG_FUNCTION_ARGS)
IndexUniqueCheck checkUnique = PG_GETARG_INT32(5);
#endif
grn_obj *sourcesTable;
grn_obj *sourcesCtidColumn;

sourcesTable = PGrnLookupSourcesTable(index, ERROR);
PGrnInsert(ctx, index, sourcesTable, values, isnull, ht_ctid);
sourcesCtidColumn = PGrnLookupSourcesCtidColumn(index, ERROR);
PGrnInsert(ctx, index, sourcesTable, sourcesCtidColumn,
values, isnull, ht_ctid);
grn_db_touch(ctx, grn_ctx_db(ctx));

PG_RETURN_BOOL(true);
Expand All @@ -1149,14 +1193,15 @@ PGrnScanOpaqueInit(PGrnScanOpaque so, Relation index)
{
so->dataTableID = index->rd_index->indrelid;
so->sourcesTable = PGrnLookupSourcesTable(index, ERROR);
so->sourcesCtidColumn = PGrnLookupSourcesCtidColumn(index, ERROR);
GRN_VOID_INIT(&(so->minBorderValue));
GRN_VOID_INIT(&(so->maxBorderValue));
so->searched = NULL;
so->sorted = NULL;
so->targetTable = NULL;
so->indexCursor = NULL;
so->tableCursor = NULL;
so->keyAccessor = NULL;
so->ctidAccessor = NULL;
so->scoreAccessor = NULL;
so->currentID = GRN_ID_NIL;

Expand All @@ -1172,10 +1217,10 @@ PGrnScanOpaqueReinit(PGrnScanOpaque so)
grn_obj_unlink(ctx, so->scoreAccessor);
so->scoreAccessor = NULL;
}
if (so->keyAccessor)
if (so->ctidAccessor)
{
grn_obj_unlink(ctx, so->keyAccessor);
so->keyAccessor = NULL;
grn_obj_unlink(ctx, so->ctidAccessor);
so->ctidAccessor = NULL;
}
if (so->indexCursor)
{
Expand Down Expand Up @@ -1493,9 +1538,9 @@ PGrnOpenTableCursor(IndexScanDesc scan, ScanDirection dir)
so->tableCursor = grn_table_cursor_open(ctx, table,
NULL, 0, NULL, 0,
offset, limit, flags);
so->keyAccessor = grn_obj_column(ctx, table,
GRN_COLUMN_NAME_KEY,
GRN_COLUMN_NAME_KEY_LEN);
so->ctidAccessor = grn_obj_column(ctx, table,
PGrnSourcesCtidColumnName,
PGrnSourcesCtidColumnNameLength);
if (so->searched)
{
so->scoreAccessor = grn_obj_column(ctx, so->searched,
Expand Down Expand Up @@ -1674,9 +1719,9 @@ PGrnRangeSearch(IndexScanDesc scan, ScanDirection dir)
indexCursorMin,
indexCursorMax,
indexCursorFlags);
so->keyAccessor = grn_obj_column(ctx, so->sourcesTable,
GRN_COLUMN_NAME_KEY,
GRN_COLUMN_NAME_KEY_LEN);
so->ctidAccessor = grn_obj_column(ctx, so->sourcesTable,
PGrnSourcesCtidColumnName,
PGrnSourcesCtidColumnNameLength);
}

static bool
Expand Down Expand Up @@ -1753,12 +1798,7 @@ pgroonga_gettuple(PG_FUNCTION_ARGS)

if (scan->kill_prior_tuple && so->currentID != GRN_ID_NIL)
{
grn_obj key;
GRN_UINT64_INIT(&key, 0);
grn_obj_get_value(ctx, so->keyAccessor, so->currentID, &key);
grn_table_delete(ctx, so->sourcesTable,
GRN_BULK_HEAD(&key), GRN_BULK_VSIZE(&key));
GRN_OBJ_FIN(ctx, &key);
grn_table_delete_by_id(ctx, so->sourcesTable, so->currentID);
}

if (so->indexCursor)
Expand All @@ -1782,12 +1822,9 @@ pgroonga_gettuple(PG_FUNCTION_ARGS)
}
else
{
grn_obj key;

GRN_UINT64_INIT(&key, 0);
grn_obj_get_value(ctx, so->keyAccessor, so->currentID, &key);
scan->xs_ctup.t_self = UInt64ToCtid(GRN_UINT64_VALUE(&key));
GRN_OBJ_FIN(ctx, &key);
GRN_BULK_REWIND(&ctidBuffer);
grn_obj_get_value(ctx, so->ctidAccessor, so->currentID, &ctidBuffer);
scan->xs_ctup.t_self = UInt64ToCtid(GRN_UINT64_VALUE(&ctidBuffer));

PG_RETURN_BOOL(true);
}
Expand All @@ -1804,21 +1841,19 @@ pgroonga_getbitmap(PG_FUNCTION_ARGS)
TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
PGrnScanOpaque so = (PGrnScanOpaque) scan->opaque;
int64 nRecords = 0;
grn_obj key;

PGrnEnsureCursorOpened(scan, ForwardScanDirection);

GRN_UINT64_INIT(&key, 0);
if (so->indexCursor)
{
grn_posting *posting;
grn_id termID;
while ((posting = grn_index_cursor_next(ctx, so->indexCursor, &termID)))
{
ItemPointerData ctid;
GRN_BULK_REWIND(&key);
grn_obj_get_value(ctx, so->keyAccessor, posting->rid, &key);
ctid = UInt64ToCtid(GRN_UINT64_VALUE(&key));
GRN_BULK_REWIND(&ctidBuffer);
grn_obj_get_value(ctx, so->ctidAccessor, posting->rid, &ctidBuffer);
ctid = UInt64ToCtid(GRN_UINT64_VALUE(&ctidBuffer));
tbm_add_tuples(tbm, &ctid, 1, false);
nRecords++;
}
Expand All @@ -1829,14 +1864,13 @@ pgroonga_getbitmap(PG_FUNCTION_ARGS)
while ((id = grn_table_cursor_next(ctx, so->tableCursor)) != GRN_ID_NIL)
{
ItemPointerData ctid;
GRN_BULK_REWIND(&key);
grn_obj_get_value(ctx, so->keyAccessor, id, &key);
ctid = UInt64ToCtid(GRN_UINT64_VALUE(&key));
GRN_BULK_REWIND(&ctidBuffer);
grn_obj_get_value(ctx, so->ctidAccessor, id, &ctidBuffer);
ctid = UInt64ToCtid(GRN_UINT64_VALUE(&ctidBuffer));
tbm_add_tuples(tbm, &ctid, 1, false);
nRecords++;
}
}
GRN_OBJ_FIN(ctx, &key);

PG_RETURN_INT64(nRecords);
}
Expand Down Expand Up @@ -1890,8 +1924,8 @@ PGrnBuildCallback(Relation index,
PGrnBuildState bs = (PGrnBuildState) state;

if (tupleIsAlive) {
PGrnInsert(ctx, index, bs->sourcesTable, values,
isnull, &(htup->t_self));
PGrnInsert(ctx, index, bs->sourcesTable, bs->sourcesCtidColumn,
values, isnull, &(htup->t_self));
bs->nIndexedTuples++;
}
}
Expand Down Expand Up @@ -1921,7 +1955,10 @@ pgroonga_build(PG_FUNCTION_ARGS)
GRN_PTR_INIT(&lexicons, GRN_OBJ_VECTOR, GRN_ID_NIL);
PG_TRY();
{
PGrnCreate(index, &(bs.sourcesTable), &lexicons);
PGrnCreate(index,
&(bs.sourcesTable),
&(bs.sourcesCtidColumn),
&lexicons);
nHeapTuples = IndexBuildHeapScan(heap, index, indexInfo, true,
PGrnBuildCallback, &bs);
PGrnSetSources(index, bs.sourcesTable);
Expand Down Expand Up @@ -1961,12 +1998,13 @@ pgroonga_buildempty(PG_FUNCTION_ARGS)
{
Relation index = (Relation) PG_GETARG_POINTER(0);
grn_obj *sourcesTable = NULL;
grn_obj *sourcesCtidColumn = NULL;
grn_obj lexicons;

GRN_PTR_INIT(&lexicons, GRN_OBJ_VECTOR, GRN_ID_NIL);
PG_TRY();
{
PGrnCreate(index, &sourcesTable, &lexicons);
PGrnCreate(index, &sourcesTable, &sourcesCtidColumn, &lexicons);
PGrnSetSources(index, sourcesTable);
}
PG_CATCH();
Expand Down Expand Up @@ -2044,15 +2082,19 @@ pgroonga_bulkdelete(PG_FUNCTION_ARGS)

PG_TRY();
{
while (grn_table_cursor_next(ctx, cursor) != GRN_ID_NIL)
grn_id id;
grn_obj *sourcesCtidColumn;

sourcesCtidColumn = PGrnLookupSourcesCtidColumn(index, ERROR);
while ((id = grn_table_cursor_next(ctx, cursor)) != GRN_ID_NIL)
{
ItemPointerData ctid;
uint64 *key;

CHECK_FOR_INTERRUPTS();

grn_table_cursor_get_key(ctx, cursor, (void **) &key);
ctid = UInt64ToCtid(*key);
GRN_BULK_REWIND(&ctidBuffer);
grn_obj_get_value(ctx, sourcesCtidColumn, id, &ctidBuffer);
ctid = UInt64ToCtid(GRN_UINT64_VALUE(&ctidBuffer));
if (callback(&ctid, callback_state))
{
grn_table_cursor_delete(ctx, cursor);
Expand Down
2 changes: 2 additions & 0 deletions pgroonga.h
Expand Up @@ -39,6 +39,8 @@
#define PGrnDatabaseBasename "pgrn"
#define PGrnSourcesTableNamePrefix "Sources"
#define PGrnSourcesTableNameFormat PGrnSourcesTableNamePrefix "%u"
#define PGrnSourcesCtidColumnName "ctid"
#define PGrnSourcesCtidColumnNameLength (sizeof(PGrnSourcesCtidColumnName) - 1)
#define PGrnLexiconNameFormat "Lexicon%u_%u"
#define PGrnIndexColumnName "index"

Expand Down

0 comments on commit 838b971

Please sign in to comment.