Permalink
Browse files

num_threads based on device info

  • Loading branch information...
1 parent 3450d94 commit 00d99fb3605ce5a0e76484182539014719429b41 @kaigai committed Jan 15, 2012
Showing with 85 additions and 99 deletions.
  1. +20 −82 blkload.c
  2. +1 −1 pg_strom--1.0.sql
  3. +20 −2 pg_strom.c
  4. +2 −2 pg_strom.h
  5. +4 −1 plan.c
  6. +33 −7 scan.c
  7. +5 −4 utilcmds.c
View
102 blkload.c
@@ -134,7 +134,6 @@ construct_cs_values(Datum cs_values[], bool cs_isnull[], int nitems,
static void
pgstrom_one_chunk_insert(Relation srel,
Oid seqid,
- uint32 chunk_size,
uint32 nitems,
Relation id_rel,
Relation cs_rels[],
@@ -153,8 +152,6 @@ pgstrom_one_chunk_insert(Relation srel,
int save_sec_context;
AttrNumber attno, nattrs;
- Assert(chunk_size % BITS_PER_BYTE == 0);
-
/*
* Acquire a row-id of the head of this chunk
*/
@@ -167,17 +164,13 @@ pgstrom_one_chunk_insert(Relation srel,
/*
* Insert rowid-map of this chunk.
- *
- * XXX - note that its 'nitems' is always chunk_size to make clear
- * between rowid and (rowid + nitems - 1) are occupied.
*/
memset(values, 0, sizeof(values));
memset(isnull, 0, sizeof(isnull));
values[Anum_pg_strom_rowid - 1] = Int64GetDatum(rowid);
- values[Anum_pg_strom_nitems - 1] = Int32GetDatum(chunk_size);
- values[Anum_pg_strom_isnull - 1] =
- construct_cs_isnull(cs_rowid, chunk_size);
+ values[Anum_pg_strom_nitems - 1] = Int32GetDatum(nitems);
+ values[Anum_pg_strom_isnull - 1] = construct_cs_isnull(cs_rowid, nitems);
tupdesc = RelationGetDescr(id_rel);
tuple = heap_form_tuple(tupdesc, values, isnull);
@@ -289,8 +282,7 @@ pgstrom_data_load_internal(Relation srel,
Oid seqid,
Relation id_rel,
Relation cs_rels[],
- Form_pg_attribute cs_attrs[],
- uint32 chunk_size)
+ Form_pg_attribute cs_attrs[])
{
TupleDesc tupdesc;
HeapScanDesc scan;
@@ -309,15 +301,15 @@ pgstrom_data_load_internal(Relation srel,
rs_values = palloc0(sizeof(Datum) * tupdesc->natts);
rs_isnull = palloc0(sizeof(bool) * tupdesc->natts);
- cs_rowmap = palloc0(sizeof(bool) * chunk_size);
+ cs_rowmap = palloc0(sizeof(bool) * PGSTROM_CHUNK_SIZE);
cs_values = palloc0(sizeof(Datum *) * tupdesc->natts);
cs_isnull = palloc0(sizeof(bool *) * tupdesc->natts);
for (attno = 0; attno < tupdesc->natts; attno++)
{
if (!cs_rels[attno])
continue;
- cs_values[attno] = palloc(sizeof(Datum) * chunk_size);
- cs_isnull[attno] = palloc(sizeof(bool) * chunk_size);
+ cs_values[attno] = palloc(sizeof(Datum) * PGSTROM_CHUNK_SIZE);
+ cs_isnull[attno] = palloc(sizeof(bool) * PGSTROM_CHUNK_SIZE);
}
/*
@@ -337,7 +329,7 @@ pgstrom_data_load_internal(Relation srel,
oldcxt = MemoryContextSwitchTo(cs_memcxt);
index = 0;
- memset(cs_rowmap, -1, sizeof(bool) * chunk_size);
+ memset(cs_rowmap, -1, sizeof(bool) * PGSTROM_CHUNK_SIZE);
while (HeapTupleIsValid(tuple = heap_getnext(scan, ForwardScanDirection)))
{
@@ -369,27 +361,33 @@ pgstrom_data_load_internal(Relation srel,
}
}
- if (++index == chunk_size)
+ if (++index == PGSTROM_CHUNK_SIZE)
{
- pgstrom_one_chunk_insert(srel, seqid, chunk_size, index,
+ pgstrom_one_chunk_insert(srel, seqid, index,
id_rel, cs_rels, cs_attrs,
cs_rowmap, cs_isnull, cs_values);
/*
* Rewind the index to the head, and release all
* the per-chunk memory
*/
index = 0;
- memset(cs_rowmap, -1, sizeof(bool) * chunk_size);
+ memset(cs_rowmap, -1, sizeof(bool) * PGSTROM_CHUNK_SIZE);
+ for (attno = 0; attno < tupdesc->natts; attno++)
+ {
+ if (cs_rels[attno])
+ memset(cs_isnull[attno], -1,
+ sizeof(bool)*PGSTROM_CHUNK_SIZE);
+ }
MemoryContextReset(cs_memcxt);
}
}
if (index > 0)
{
/* index should be round up to multiple number of 8 */
index = (index + BITS_PER_BYTE - 1) & ~(BITS_PER_BYTE - 1);
- Assert(index <= chunk_size);
+ Assert(index <= PGSTROM_CHUNK_SIZE);
- pgstrom_one_chunk_insert(srel, seqid, chunk_size, index,
+ pgstrom_one_chunk_insert(srel, seqid, index,
id_rel, cs_rels, cs_attrs,
cs_rowmap, cs_isnull, cs_values);
}
@@ -402,7 +400,7 @@ pgstrom_data_load_internal(Relation srel,
/*
* bool
- * pgstrom_data_load(regclass dest, regclass source, uint32 chunk_size)
+ * pgstrom_data_load(regclass dest, regclass source)
*
* This function loads the contents of source table into the destination
* foreign table managed by PG-Strom; with the supplied chunk_size.
@@ -419,12 +417,10 @@ pgstrom_data_load(PG_FUNCTION_ARGS)
Bitmapset *dcols = NULL;
RangeTblEntry *srte;
RangeTblEntry *drte;
- HeapScanDesc scan;
HeapTuple tuple;
RangeVar *range;
Oid nspid;
Oid seqid;
- uint32 chunk_size = PG_GETARG_UINT32(2);
AttrNumber i, nattrs;
/*
@@ -506,62 +502,6 @@ pgstrom_data_load(PG_FUNCTION_ARGS)
}
/*
- * Check correctness of the chunk-size
- */
- scan = heap_beginscan(id_rel, SnapshotNow, 0, NULL);
- tuple = heap_getnext(scan, ForwardScanDirection);
- if (HeapTupleIsValid(tuple))
- {
- TupleDesc tupdesc = RelationGetDescr(id_rel);
- Datum values[Natts_pg_strom - 1];
- bool isnull[Natts_pg_strom - 1];
- uint32 nitems;
-
- heap_deform_tuple(tuple, tupdesc, values, isnull);
- Assert(!isnull[0] && !isnull[1]);
-
- nitems = DatumGetInt32(values[Anum_pg_strom_nitems - 1]);
- if (chunk_size == 0)
- chunk_size = nitems;
- else if (chunk_size != nitems)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("chunk size must be same with existing data")));
- }
- else
- {
- AlterSeqStmt *stmt;
- Oid save_userid;
- int save_sec_context;
-
- if (chunk_size == 0)
- chunk_size = BLCKSZ * BITS_PER_BYTE / 2; /* default */
- else if ((chunk_size & (chunk_size - 1)) != 0)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("chunk size must be a power of 2")));
- else if (chunk_size < 4096 || chunk_size > BLCKSZ * BITS_PER_BYTE)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("chunk size is out of range")));
-
- /* Reset sequence object to fit the supplied chunk size */
- stmt = makeNode(AlterSeqStmt);
- stmt->sequence = pgstrom_lookup_shadow_sequence(drel);
- stmt->options = list_make2(
- makeDefElem("increment", (Node *)makeInteger(chunk_size)),
- makeDefElem("restart", (Node *)makeInteger(0)));
-
- GetUserIdAndSecContext(&save_userid, &save_sec_context);
- SetUserIdAndSecContext(BOOTSTRAP_SUPERUSERID, save_sec_context);
-
- AlterSequence(stmt);
-
- SetUserIdAndSecContext(save_userid, save_sec_context);
- }
- heap_endscan(scan);
-
- /*
* Set up RangeTblEntry, then Permission checks
*/
srte = makeNode(RangeTblEntry);
@@ -583,9 +523,7 @@ pgstrom_data_load(PG_FUNCTION_ARGS)
/*
* Load data
*/
- pgstrom_data_load_internal(srel, seqid, id_rel,
- cs_rels, cs_attrs,
- chunk_size);
+ pgstrom_data_load_internal(srel, seqid, id_rel, cs_rels, cs_attrs);
/*
* Close the relation
View
@@ -17,7 +17,7 @@ CREATE FOREIGN DATA WRAPPER pg_strom
CREATE SERVER pg_strom FOREIGN DATA WRAPPER pg_strom;
-CREATE FUNCTION pgstrom_data_load(regclass, regclass,int4)
+CREATE FUNCTION pgstrom_data_load(regclass, regclass)
RETURNS bool
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
View
@@ -11,6 +11,7 @@
* this package.
*/
#include "postgres.h"
+#include "access/reloptions.h"
#include "catalog/pg_type.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
@@ -38,7 +39,7 @@ FdwRoutine pgstromFdwHandlerData = {
/*
* pgstrom_fdw_handler
*
- * FDW Handler function of pg_strom
+ * FDW Handler function of PG-Strom
*/
Datum
pgstrom_fdw_handler(PG_FUNCTION_ARGS)
@@ -47,10 +48,27 @@ pgstrom_fdw_handler(PG_FUNCTION_ARGS)
}
PG_FUNCTION_INFO_V1(pgstrom_fdw_handler);
-/****/
+/*
+ * pgstrom_fdw_validator
+ *
+ * FDW option validator of PG-Strom
+ */
Datum
pgstrom_fdw_validator(PG_FUNCTION_ARGS)
{
+ Datum rawopts = PG_GETARG_DATUM(0);
+ List *options_list;
+ ListCell *cell;
+
+ options_list = untransformRelOptions(rawopts);
+ foreach (cell, options_list)
+ {
+ DefElem *defel = lfirst(cell);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
+ errmsg("invalid option \"%s\"", defel->defname)));
+ }
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(pgstrom_fdw_validator);
View
@@ -20,15 +20,15 @@
#include "utils/memutils.h"
#include <cuda.h>
+#define PGSTROM_CHUNK_SIZE (BLCKSZ * BITS_PER_BYTE / 2)
+
#define PGSTROM_SCHEMA_NAME "pg_strom"
#define Natts_pg_strom 4
#define Anum_pg_strom_rowid 1
#define Anum_pg_strom_nitems 2
#define Anum_pg_strom_isnull 3
#define Anum_pg_strom_values 4
-#define PGSTROM_THREADS_PER_BLOCK 32
-
/*
* utilcmds.c
*/
View
5 plan.c
@@ -466,7 +466,8 @@ make_device_qual_source(Oid base_relid, List *device_quals,
appendStringInfo(&kern,
"__global__ void\n"
- "pgstrom_qual(unsigned char rowmap[]");
+ "pgstrom_qual(unsigned int nitems,\n"
+ " unsigned char rowmap[]");
appendStringInfo(&blk1,
" int offset_base = blockIdx.x * blockDim.x + threadIdx.x;\n"
" int offset = offset_base * 8;\n"
@@ -500,6 +501,8 @@ make_device_qual_source(Oid base_relid, List *device_quals,
"%s"
" int bitmask;\n"
"\n"
+ " if (offset >= nitems)\n"
+ " return;\n"
"%s"
"\n"
" if ((result & bitmask) == 0 &&\n"
View
40 scan.c
@@ -51,8 +51,7 @@ typedef struct {
int dev_index;
CUmodule dev_module;
CUfunction dev_function;
- uint32 dev_grid_sz;
- uint32 dev_block_sz;
+ uint32 dev_nthreads;
} PgStromDevContext;
typedef struct {
@@ -233,11 +232,13 @@ pgstrom_exec_kernel_qual(PgStromDevContext *dev_cxt, PgStromChunkBuf *chunk)
/*
* Setup kernel arguments
*/
- kernel_data = alloca((1 + 2 * chunk->nattrs) * sizeof(CUdeviceptr));
- kernel_args = alloca((1 + 2 * chunk->nattrs) * sizeof(void *));
- kernel_data[0] = chunk->devmem;
+ kernel_data = alloca((2 + 2 * chunk->nattrs) * sizeof(CUdeviceptr));
+ kernel_args = alloca((2 + 2 * chunk->nattrs) * sizeof(void *));
+ kernel_data[0] = chunk->nitems;
kernel_args[0] = &kernel_data[0];
- for (i=0, j=1; i < chunk->nattrs; i++)
+ kernel_data[1] = chunk->devmem;
+ kernel_args[1] = &kernel_data[1];
+ for (i=0, j=2; i < chunk->nattrs; i++)
{
if (chunk->cs_values[i] > 0)
{
@@ -252,7 +253,7 @@ pgstrom_exec_kernel_qual(PgStromDevContext *dev_cxt, PgStromChunkBuf *chunk)
/*
* Launch kernel function
*/
- n_threads = PGSTROM_THREADS_PER_BLOCK;
+ n_threads = dev_cxt->dev_nthreads;
n_blocks = (chunk->nitems + n_threads * BITS_PER_BYTE - 1)
/ (BITS_PER_BYTE * n_threads);
ret = cuLaunchKernel(dev_cxt->dev_function,
@@ -973,6 +974,31 @@ pgstrom_init_exec_device(void *image, int dev_index,
errmsg("Failed to reference number of registers: %s",
cuda_error_to_string(ret))));
}
+
+ /*
+ * Larger number of threads within a particular block is better
+ * strategy as long as it can be executable; from the perspective
+ * that increase occupacy of streaming processor.
+ */
+ n_threads = (n_threads - (n_threads % dev_info->dev_proc_warp_sz));
+
+ /*
+ * However, it is not desirable the number of threads are too large
+ * not to utilize all the streaming processors concurrently.
+ * In this case, we adjust number of threads to appropriate level.
+ */
+ if (PGSTROM_CHUNK_SIZE / n_threads < dev_info->dev_proc_nums)
+ {
+ n_threads = PGSTROM_CHUNK_SIZE / dev_info->dev_proc_nums;
+ n_threads -= (n_threads % dev_info->dev_proc_warp_sz);
+ }
+ dev_cxt->dev_nthreads = n_threads;
+
+ /*
+ * TODO: maximun number of concurrent chunks also shoukd be modified
+ * to avoid over-consumption of device memory.
+ */
+
return dev_cxt;
}
View
@@ -350,10 +350,11 @@ pgstrom_create_shadow_sequence(Oid namespaceId, Relation base_rel)
rowid_namelist = list_make3(makeString(PGSTROM_SCHEMA_NAME),
makeString(rel_name),
makeString("rowid"));
- seq_stmt->options = list_make3(
- makeDefElem("minvalue", (Node *)makeInteger(0)),
- makeDefElem("maxvalue", (Node *)makeInteger((1UL<<48) - 1)),
- makeDefElem("owned_by", (Node *)rowid_namelist));
+ seq_stmt->options = list_make4(
+ makeDefElem("increment", (Node *)makeInteger(PGSTROM_CHUNK_SIZE)),
+ makeDefElem("minvalue", (Node *)makeInteger(0)),
+ makeDefElem("maxvalue", (Node *)makeInteger((1UL<<48) - 1)),
+ makeDefElem("owned_by", (Node *)rowid_namelist));
seq_stmt->ownerId = RelationGetForm(base_rel)->relowner;
DefineSequence(seq_stmt);

0 comments on commit 00d99fb

Please sign in to comment.