|
|
@@ -47,6 +47,7 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, |
|
|
int bucketNumber);
|
|
|
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
|
|
|
|
|
|
+static void *dense_alloc(HashJoinTable hashtable, Size size);
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
* ExecHash
|
|
|
@@ -293,6 +294,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) |
|
|
hashtable->spaceUsedSkew = 0;
|
|
|
hashtable->spaceAllowedSkew =
|
|
|
hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
|
|
|
+ hashtable->chunks = NULL;
|
|
|
|
|
|
/*
|
|
|
* Get info about the hash functions to be used for each hash key. Also
|
|
|
@@ -556,10 +558,10 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) |
|
|
int oldnbatch = hashtable->nbatch;
|
|
|
int curbatch = hashtable->curbatch;
|
|
|
int nbatch;
|
|
|
- int i;
|
|
|
MemoryContext oldcxt;
|
|
|
long ninmemory;
|
|
|
long nfreed;
|
|
|
+ HashMemoryChunk oldchunks;
|
|
|
|
|
|
/* do nothing if we've decided to shut off growth */
|
|
|
if (!hashtable->growEnabled)
|
|
|
@@ -612,51 +614,65 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) |
|
|
*/
|
|
|
ninmemory = nfreed = 0;
|
|
|
|
|
|
- for (i = 0; i < hashtable->nbuckets; i++)
|
|
|
- {
|
|
|
- HashJoinTuple prevtuple;
|
|
|
- HashJoinTuple tuple;
|
|
|
+ /*
|
|
|
+ * We will scan through the chunks directly, so that we can reset the
|
|
|
+ * buckets now and not have to keep track which tuples in the buckets have
|
|
|
+ * already been processed. We will free the old chunks as we go.
|
|
|
+ */
|
|
|
+ memset(hashtable->buckets, 0, sizeof(HashJoinTuple *) * hashtable->nbuckets);
|
|
|
+ oldchunks = hashtable->chunks;
|
|
|
+ hashtable->chunks = NULL;
|
|
|
|
|
|
- prevtuple = NULL;
|
|
|
- tuple = hashtable->buckets[i];
|
|
|
+ /* so, let's scan through the old chunks, and all tuples in each chunk */
|
|
|
+ while (oldchunks != NULL)
|
|
|
+ {
|
|
|
+ HashMemoryChunk nextchunk = oldchunks->next;
|
|
|
+ /* position within the buffer (up to oldchunks->used) */
|
|
|
+ size_t idx = 0;
|
|
|
|
|
|
- while (tuple != NULL)
|
|
|
+ /* process all tuples stored in this chunk (and then free it) */
|
|
|
+ while (idx < oldchunks->used)
|
|
|
{
|
|
|
- /* save link in case we delete */
|
|
|
- HashJoinTuple nexttuple = tuple->next;
|
|
|
+ HashJoinTuple hashTuple = (HashJoinTuple) (oldchunks->data + idx);
|
|
|
+ MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
|
|
|
+ int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
|
|
|
int bucketno;
|
|
|
int batchno;
|
|
|
|
|
|
ninmemory++;
|
|
|
- ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
|
|
|
+ ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
|
|
|
&bucketno, &batchno);
|
|
|
- Assert(bucketno == i);
|
|
|
+
|
|
|
if (batchno == curbatch)
|
|
|
{
|
|
|
- /* keep tuple */
|
|
|
- prevtuple = tuple;
|
|
|
+ /* keep tuple in memory - copy it into the new chunk */
|
|
|
+ HashJoinTuple copyTuple =
|
|
|
+ (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
|
|
|
+ memcpy(copyTuple, hashTuple, hashTupleSize);
|
|
|
+
|
|
|
+ /* and add it back to the appropriate bucket */
|
|
|
+ copyTuple->next = hashtable->buckets[bucketno];
|
|
|
+ hashtable->buckets[bucketno] = copyTuple;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
/* dump it out */
|
|
|
Assert(batchno > curbatch);
|
|
|
- ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple),
|
|
|
- tuple->hashvalue,
|
|
|
+ ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
|
|
|
+ hashTuple->hashvalue,
|
|
|
&hashtable->innerBatchFile[batchno]);
|
|
|
- /* and remove from hash table */
|
|
|
- if (prevtuple)
|
|
|
- prevtuple->next = nexttuple;
|
|
|
- else
|
|
|
- hashtable->buckets[i] = nexttuple;
|
|
|
- /* prevtuple doesn't change */
|
|
|
- hashtable->spaceUsed -=
|
|
|
- HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len;
|
|
|
- pfree(tuple);
|
|
|
+
|
|
|
+ hashtable->spaceUsed -= hashTupleSize;
|
|
|
nfreed++;
|
|
|
}
|
|
|
|
|
|
- tuple = nexttuple;
|
|
|
+ /* next tuple in this chunk */
|
|
|
+ idx += MAXALIGN(hashTupleSize);
|
|
|
}
|
|
|
+
|
|
|
+ /* we're done with this chunk - free it and proceed to the next one */
|
|
|
+ pfree(oldchunks);
|
|
|
+ oldchunks = nextchunk;
|
|
|
}
|
|
|
|
|
|
#ifdef HJDEBUG
|
|
|
@@ -717,8 +733,8 @@ ExecHashTableInsert(HashJoinTable hashtable, |
|
|
|
|
|
/* Create the HashJoinTuple */
|
|
|
hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
|
|
|
- hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
|
|
|
- hashTupleSize);
|
|
|
+ hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
|
|
|
+
|
|
|
hashTuple->hashvalue = hashvalue;
|
|
|
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
|
|
|
|
|
|
@@ -1068,6 +1084,9 @@ ExecHashTableReset(HashJoinTable hashtable) |
|
|
hashtable->spaceUsed = 0;
|
|
|
|
|
|
MemoryContextSwitchTo(oldcxt);
|
|
|
+
|
|
|
+ /* Forget the chunks (the memory was freed by the context reset above). */
|
|
|
+ hashtable->chunks = NULL;
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -1462,3 +1481,79 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) |
|
|
hashtable->spaceUsedSkew = 0;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+/*
|
|
|
+ * Allocate 'size' bytes from the currently active HashMemoryChunk
|
|
|
+ */
|
|
|
+static void *
|
|
|
+dense_alloc(HashJoinTable hashtable, Size size)
|
|
|
+{
|
|
|
+ HashMemoryChunk newChunk;
|
|
|
+ char *ptr;
|
|
|
+
|
|
|
+ /* just in case the size is not already aligned properly */
|
|
|
+ size = MAXALIGN(size);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If tuple size is larger than of 1/4 of chunk size, allocate a separate
|
|
|
+ * chunk.
|
|
|
+ */
|
|
|
+ if (size > HASH_CHUNK_THRESHOLD)
|
|
|
+ {
|
|
|
+ /* allocate new chunk and put it at the beginning of the list */
|
|
|
+ newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
|
|
|
+ offsetof(HashMemoryChunkData, data) + size);
|
|
|
+ newChunk->maxlen = size;
|
|
|
+ newChunk->used = 0;
|
|
|
+ newChunk->ntuples = 0;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Add this chunk to the list after the first existing chunk, so that
|
|
|
+ * we don't lose the remaining space in the "current" chunk.
|
|
|
+ */
|
|
|
+ if (hashtable->chunks != NULL)
|
|
|
+ {
|
|
|
+ newChunk->next = hashtable->chunks->next;
|
|
|
+ hashtable->chunks->next = newChunk;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ newChunk->next = hashtable->chunks;
|
|
|
+ hashtable->chunks = newChunk;
|
|
|
+ }
|
|
|
+
|
|
|
+ newChunk->used += size;
|
|
|
+ newChunk->ntuples += 1;
|
|
|
+
|
|
|
+ return newChunk->data;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * See if we have enough space for it in the current chunk (if any).
|
|
|
+ * If not, allocate a fresh chunk.
|
|
|
+ */
|
|
|
+ if ((hashtable->chunks == NULL) ||
|
|
|
+ (hashtable->chunks->maxlen - hashtable->chunks->used) < size)
|
|
|
+ {
|
|
|
+ /* allocate new chunk and put it at the beginning of the list */
|
|
|
+ newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
|
|
|
+ offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE);
|
|
|
+
|
|
|
+ newChunk->maxlen = HASH_CHUNK_SIZE;
|
|
|
+ newChunk->used = size;
|
|
|
+ newChunk->ntuples = 1;
|
|
|
+
|
|
|
+ newChunk->next = hashtable->chunks;
|
|
|
+ hashtable->chunks = newChunk;
|
|
|
+
|
|
|
+ return newChunk->data;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* There is enough space in the current chunk, let's add the tuple */
|
|
|
+ ptr = hashtable->chunks->data + hashtable->chunks->used;
|
|
|
+ hashtable->chunks->used += size;
|
|
|
+ hashtable->chunks->ntuples += 1;
|
|
|
+
|
|
|
+ /* return pointer to the start of the tuple memory */
|
|
|
+ return ptr;
|
|
|
+}
|
0 comments on commit
45f6240