Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Allow I/O reliability checks using 16-bit checksums

Checksums are set immediately prior to flush out of shared buffers
and checked when pages are read in again. Hint bit setting will
require full page write when block is dirtied, which causes various
infrastructure changes. Extensive comments, docs and README.

WARNING message thrown if checksum fails on non-all zeroes page;
ERROR thrown but can be disabled with ignore_checksum_failure = on.

Feature enabled by an initdb option, since transition from option off
to option on is long and complex and has not yet been implemented.
Default is not to use checksums.

Checksum used is WAL CRC-32 truncated to 16-bits.

Simon Riggs, Jeff Davis, Greg Smith
Wide input and assistance from many community members. Thank you.
  • Loading branch information...
commit 96ef3b8ff1cf1950e897fd2f766d4bd9ef0d5d56 1 parent e4a05c7
Simon Riggs authored

Showing 40 changed files with 766 additions and 146 deletions. Show diff stats Hide diff stats

  1. +31 1 contrib/pg_upgrade/controldata.c
  2. +1 0  contrib/pg_upgrade/pg_upgrade.h
  3. +24 0 doc/src/sgml/config.sgml
  4. +14 0 doc/src/sgml/ref/initdb.sgml
  5. +6 2 src/backend/access/gist/gistget.c
  6. +2 4 src/backend/access/hash/hash.c
  7. +64 32 src/backend/access/heap/heapam.c
  8. +1 1  src/backend/access/heap/pruneheap.c
  9. +4 0 src/backend/access/heap/rewriteheap.c
  10. +38 10 src/backend/access/heap/visibilitymap.c
  11. +7 3 src/backend/access/nbtree/nbtinsert.c
  12. +2 1  src/backend/access/nbtree/nbtree.c
  13. +3 0  src/backend/access/nbtree/nbtsort.c
  14. +2 4 src/backend/access/nbtree/nbtutils.c
  15. +4 0 src/backend/access/rmgrdesc/xlogdesc.c
  16. +3 0  src/backend/access/spgist/spginsert.c
  17. +22 0 src/backend/access/transam/README
  18. +107 4 src/backend/access/transam/xlog.c
  19. +6 1 src/backend/bootstrap/bootstrap.c
  20. +2 0  src/backend/commands/matview.c
  21. +1 1  src/backend/commands/sequence.c
  22. +2 0  src/backend/commands/tablecmds.c
  23. +7 7 src/backend/commands/vacuumlazy.c
  24. +175 39 src/backend/storage/buffer/bufmgr.c
  25. +5 2 src/backend/storage/buffer/localbuf.c
  26. +3 1 src/backend/storage/freespace/README
  27. +6 4 src/backend/storage/freespace/freespace.c
  28. +1 1  src/backend/storage/freespace/fsmpage.c
  29. +167 16 src/backend/storage/page/bufpage.c
  30. +16 0 src/backend/utils/misc/guc.c
  31. +2 2 src/backend/utils/time/tqual.c
  32. +16 3 src/bin/initdb/initdb.c
  33. +2 0  src/bin/pg_controldata/pg_controldata.c
  34. +2 0  src/bin/pg_resetxlog/pg_resetxlog.c
  35. +1 1  src/include/access/heapam_xlog.h
  36. +2 2 src/include/access/visibilitymap.h
  37. +3 0  src/include/access/xlog.h
  38. +6 2 src/include/catalog/pg_control.h
  39. +2 1  src/include/storage/bufmgr.h
  40. +4 1 src/include/storage/bufpage.h
32 contrib/pg_upgrade/controldata.c
@@ -56,6 +56,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
56 56 bool got_toast = false;
57 57 bool got_date_is_int = false;
58 58 bool got_float8_pass_by_value = false;
  59 + bool got_data_checksums = false;
59 60 char *lc_collate = NULL;
60 61 char *lc_ctype = NULL;
61 62 char *lc_monetary = NULL;
@@ -131,6 +132,13 @@ get_control_data(ClusterInfo *cluster, bool live_check)
131 132 got_float8_pass_by_value = true;
132 133 }
133 134
  135 + /* Only in <= 9.2 */
  136 + if (GET_MAJOR_VERSION(cluster->major_version) <= 902)
  137 + {
  138 + cluster->controldata.data_checksums = false;
  139 + got_data_checksums = true;
  140 + }
  141 +
134 142 /* we have the result of cmd in "output". so parse it line by line now */
135 143 while (fgets(bufin, sizeof(bufin), output))
136 144 {
@@ -393,6 +401,18 @@ get_control_data(ClusterInfo *cluster, bool live_check)
393 401 cluster->controldata.float8_pass_by_value = strstr(p, "by value") != NULL;
394 402 got_float8_pass_by_value = true;
395 403 }
  404 + else if ((p = strstr(bufin, "checksums")) != NULL)
  405 + {
  406 + p = strchr(p, ':');
  407 +
  408 + if (p == NULL || strlen(p) <= 1)
  409 + pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
  410 +
  411 + p++; /* removing ':' char */
  412 + /* used later for contrib check */
  413 + cluster->controldata.data_checksums = strstr(p, "enabled") != NULL;
  414 + got_data_checksums = true;
  415 + }
396 416 /* In pre-8.4 only */
397 417 else if ((p = strstr(bufin, "LC_COLLATE:")) != NULL)
398 418 {
@@ -476,7 +496,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
476 496 !got_tli ||
477 497 !got_align || !got_blocksz || !got_largesz || !got_walsz ||
478 498 !got_walseg || !got_ident || !got_index || !got_toast ||
479   - !got_date_is_int || !got_float8_pass_by_value)
  499 + !got_date_is_int || !got_float8_pass_by_value || !got_data_checksums)
480 500 {
481 501 pg_log(PG_REPORT,
482 502 "The %s cluster lacks some required control information:\n",
@@ -535,6 +555,10 @@ get_control_data(ClusterInfo *cluster, bool live_check)
535 555 if (!got_float8_pass_by_value)
536 556 pg_log(PG_REPORT, " float8 argument passing method\n");
537 557
  558 + /* value added in Postgres 9.3 */
  559 + if (!got_data_checksums)
  560 + pg_log(PG_REPORT, " data checksums\n");
  561 +
538 562 pg_log(PG_FATAL,
539 563 "Cannot continue without required control information, terminating\n");
540 564 }
@@ -596,6 +620,12 @@ check_control_data(ControlData *oldctrl,
596 620 "--disable-integer-datetimes or get server binaries built with those\n"
597 621 "options.\n");
598 622 }
  623 +
  624 + if (oldctrl->data_checksums != newctrl->data_checksums)
  625 + {
  626 + pg_log(PG_FATAL,
  627 + "old and new pg_controldata checksums settings are invalid or do not match\n");
  628 + }
599 629 }
600 630
601 631
1  contrib/pg_upgrade/pg_upgrade.h
@@ -202,6 +202,7 @@ typedef struct
202 202 uint32 toast;
203 203 bool date_is_int;
204 204 bool float8_pass_by_value;
  205 + bool data_checksums;
205 206 char *lc_collate;
206 207 char *lc_ctype;
207 208 char *encoding;
24 doc/src/sgml/config.sgml
@@ -6629,6 +6629,30 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
6629 6629 </listitem>
6630 6630 </varlistentry>
6631 6631
  6632 + <varlistentry id="guc-ignore-checksum-failure" xreflabel="ignore_checksum_failure">
  6633 + <term><varname>ignore_checksum_failure</varname> (<type>boolean</type>)</term>
  6634 + <indexterm>
  6635 + <primary><varname>ignore_checksum_failure</> configuration parameter</primary>
  6636 + </indexterm>
  6637 + <listitem>
  6638 + <para>
  6639 + Only has effect if <xref linkend="app-initdb-data-checksums"> are enabled.
  6640 + </para>
  6641 + <para>
  6642 + Detection of a checksum failure during a read normally causes
  6643 + <productname>PostgreSQL</> to report an error, aborting the current
  6644 + transaction. Setting <varname>ignore_checksum_failure</> to on causes
  6645 + the system to ignore the failure (but still report a warning), and
  6646 + continue processing. This behavior may <emphasis>cause crashes, propagate
  6647 + or hide corruption, or other serious problems</>. However, it may allow
  6648 + you to get past the error and retrieve undamaged tuples that might still be
  6649 + present in the table if the block header is still sane. If the header is
  6650 + corrupt an error will be reported even if this option is enabled. The
  6651 + default setting is <literal>off</>, and it can only be changed by a superuser.
  6652 + </para>
  6653 + </listitem>
  6654 + </varlistentry>
  6655 +
6632 6656 <varlistentry id="guc-zero-damaged-pages" xreflabel="zero_damaged_pages">
6633 6657 <term><varname>zero_damaged_pages</varname> (<type>boolean</type>)</term>
6634 6658 <indexterm>
14 doc/src/sgml/ref/initdb.sgml
@@ -182,6 +182,20 @@ PostgreSQL documentation
182 182 </listitem>
183 183 </varlistentry>
184 184
  185 + <varlistentry id="app-initdb-data-checksums" xreflabel="data checksums">
  186 + <term><option>-k</option></term>
  187 + <term><option>--data-checksums</option></term>
  188 + <listitem>
  189 + <para>
  190 + Use checksums on data pages to help detect corruption by the
  191 + I/O system that would otherwise be silent. Enabling checksums
  192 + may incur a noticeable performance penalty. This option can only
  193 + be set during initialization, and cannot be changed later. If
  194 + set, checksums are calculated for all objects, in all databases.
  195 + </para>
  196 + </listitem>
  197 + </varlistentry>
  198 +
185 199 <varlistentry>
186 200 <term><option>--locale=<replaceable>locale</replaceable></option></term>
187 201 <listitem>
8 src/backend/access/gist/gistget.c
@@ -362,8 +362,12 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
362 362 {
363 363 /* Creating index-page GISTSearchItem */
364 364 item->blkno = ItemPointerGetBlockNumber(&it->t_tid);
365   - /* lsn of current page is lsn of parent page for child */
366   - item->data.parentlsn = PageGetLSN(page);
  365 +
  366 + /*
  367 + * LSN of current page is lsn of parent page for child. We only
  368 + * have a shared lock, so we need to get the LSN atomically.
  369 + */
  370 + item->data.parentlsn = BufferGetLSNAtomic(buffer);
367 371 }
368 372
369 373 /* Insert it into the queue using new distance data */
6 src/backend/access/hash/hash.c
@@ -285,11 +285,9 @@ hashgettuple(PG_FUNCTION_ARGS)
285 285 ItemIdMarkDead(PageGetItemId(page, offnum));
286 286
287 287 /*
288   - * Since this can be redone later if needed, it's treated the same
289   - * as a commit-hint-bit status update for heap tuples: we mark the
290   - * buffer dirty but don't make a WAL log entry.
  288 + * Since this can be redone later if needed, mark as a hint.
291 289 */
292   - SetBufferCommitInfoNeedsSave(buf);
  290 + MarkBufferDirtyHint(buf);
293 291 }
294 292
295 293 /*
96 src/backend/access/heap/heapam.c
@@ -5754,17 +5754,23 @@ log_heap_freeze(Relation reln, Buffer buffer,
5754 5754 * being marked all-visible, and vm_buffer is the buffer containing the
5755 5755 * corresponding visibility map block. Both should have already been modified
5756 5756 * and dirtied.
  5757 + *
  5758 + * If checksums are enabled, we also add the heap_buffer to the chain to
  5759 + * protect it from being torn.
5757 5760 */
5758 5761 XLogRecPtr
5759   -log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
  5762 +log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
5760 5763 TransactionId cutoff_xid)
5761 5764 {
5762 5765 xl_heap_visible xlrec;
5763 5766 XLogRecPtr recptr;
5764   - XLogRecData rdata[2];
  5767 + XLogRecData rdata[3];
  5768 +
  5769 + Assert(BufferIsValid(heap_buffer));
  5770 + Assert(BufferIsValid(vm_buffer));
5765 5771
5766 5772 xlrec.node = rnode;
5767   - xlrec.block = block;
  5773 + xlrec.block = BufferGetBlockNumber(heap_buffer);
5768 5774 xlrec.cutoff_xid = cutoff_xid;
5769 5775
5770 5776 rdata[0].data = (char *) &xlrec;
@@ -5778,6 +5784,17 @@ log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
5778 5784 rdata[1].buffer_std = false;
5779 5785 rdata[1].next = NULL;
5780 5786
  5787 + if (DataChecksumsEnabled())
  5788 + {
  5789 + rdata[1].next = &(rdata[2]);
  5790 +
  5791 + rdata[2].data = NULL;
  5792 + rdata[2].len = 0;
  5793 + rdata[2].buffer = heap_buffer;
  5794 + rdata[2].buffer_std = true;
  5795 + rdata[2].next = NULL;
  5796 + }
  5797 +
5781 5798 recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
5782 5799
5783 5800 return recptr;
@@ -6139,8 +6156,6 @@ static void
6139 6156 heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
6140 6157 {
6141 6158 xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
6142   - Buffer buffer;
6143   - Page page;
6144 6159
6145 6160 /*
6146 6161 * If there are any Hot Standby transactions running that have an xmin
@@ -6155,39 +6170,56 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
6155 6170 ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
6156 6171
6157 6172 /*
6158   - * Read the heap page, if it still exists. If the heap file has been
6159   - * dropped or truncated later in recovery, we don't need to update the
6160   - * page, but we'd better still update the visibility map.
  6173 + * If heap block was backed up, restore it. This can only happen with
  6174 + * checksums enabled.
6161 6175 */
6162   - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
6163   - RBM_NORMAL);
6164   - if (BufferIsValid(buffer))
  6176 + if (record->xl_info & XLR_BKP_BLOCK(1))
6165 6177 {
6166   - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
6167   -
6168   - page = (Page) BufferGetPage(buffer);
  6178 + Assert(DataChecksumsEnabled());
  6179 + (void) RestoreBackupBlock(lsn, record, 1, false, false);
  6180 + }
  6181 + else
  6182 + {
  6183 + Buffer buffer;
  6184 + Page page;
6169 6185
6170 6186 /*
6171   - * We don't bump the LSN of the heap page when setting the visibility
6172   - * map bit, because that would generate an unworkable volume of
6173   - * full-page writes. This exposes us to torn page hazards, but since
6174   - * we're not inspecting the existing page contents in any way, we
6175   - * don't care.
6176   - *
6177   - * However, all operations that clear the visibility map bit *do* bump
6178   - * the LSN, and those operations will only be replayed if the XLOG LSN
6179   - * follows the page LSN. Thus, if the page LSN has advanced past our
6180   - * XLOG record's LSN, we mustn't mark the page all-visible, because
6181   - * the subsequent update won't be replayed to clear the flag.
  6187 + * Read the heap page, if it still exists. If the heap file has been
  6188 + * dropped or truncated later in recovery, we don't need to update the
  6189 + * page, but we'd better still update the visibility map.
6182 6190 */
6183   - if (lsn > PageGetLSN(page))
  6191 + buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM,
  6192 + xlrec->block, RBM_NORMAL);
  6193 + if (BufferIsValid(buffer))
6184 6194 {
6185   - PageSetAllVisible(page);
6186   - MarkBufferDirty(buffer);
6187   - }
  6195 + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
6188 6196
6189   - /* Done with heap page. */
6190   - UnlockReleaseBuffer(buffer);
  6197 + page = (Page) BufferGetPage(buffer);
  6198 +
  6199 + /*
  6200 + * We don't bump the LSN of the heap page when setting the
  6201 + * visibility map bit (unless checksums are enabled, in which case
  6202 + * we must), because that would generate an unworkable volume of
  6203 + * full-page writes. This exposes us to torn page hazards, but
  6204 + * since we're not inspecting the existing page contents in any
  6205 + * way, we don't care.
  6206 + *
  6207 + * However, all operations that clear the visibility map bit *do*
  6208 + * bump the LSN, and those operations will only be replayed if the
  6209 + * XLOG LSN follows the page LSN. Thus, if the page LSN has
  6210 + * advanced past our XLOG record's LSN, we mustn't mark the page
  6211 + * all-visible, because the subsequent update won't be replayed to
  6212 + * clear the flag.
  6213 + */
  6214 + if (lsn > PageGetLSN(page))
  6215 + {
  6216 + PageSetAllVisible(page);
  6217 + MarkBufferDirty(buffer);
  6218 + }
  6219 +
  6220 + /* Done with heap page. */
  6221 + UnlockReleaseBuffer(buffer);
  6222 + }
6191 6223 }
6192 6224
6193 6225 /*
@@ -6218,7 +6250,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
6218 6250 * real harm is done; and the next VACUUM will fix it.
6219 6251 */
6220 6252 if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
6221   - visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
  6253 + visibilitymap_set(reln, xlrec->block, InvalidBuffer, lsn, vmbuffer,
6222 6254 xlrec->cutoff_xid);
6223 6255
6224 6256 ReleaseBuffer(vmbuffer);
2  src/backend/access/heap/pruneheap.c
@@ -262,7 +262,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
262 262 {
263 263 ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
264 264 PageClearFull(page);
265   - SetBufferCommitInfoNeedsSave(buffer);
  265 + MarkBufferDirtyHint(buffer);
266 266 }
267 267 }
268 268
4 src/backend/access/heap/rewriteheap.c
@@ -273,6 +273,8 @@ end_heap_rewrite(RewriteState state)
273 273 /* Write the last page, if any */
274 274 if (state->rs_buffer_valid)
275 275 {
  276 + PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
  277 +
276 278 if (state->rs_use_wal)
277 279 log_newpage(&state->rs_new_rel->rd_node,
278 280 MAIN_FORKNUM,
@@ -614,6 +616,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
614 616 {
615 617 /* Doesn't fit, so write out the existing page */
616 618
  619 + PageSetChecksumInplace(page, state->rs_blockno);
  620 +
617 621 /* XLOG stuff */
618 622 if (state->rs_use_wal)
619 623 log_newpage(&state->rs_new_rel->rd_node,
48 src/backend/access/heap/visibilitymap.c
@@ -233,13 +233,18 @@ visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
233 233 * marked all-visible; it is needed for Hot Standby, and can be
234 234 * InvalidTransactionId if the page contains no tuples.
235 235 *
  236 + * Caller is expected to set the heap page's PD_ALL_VISIBLE bit before calling
  237 + * this function. Except in recovery, caller should also pass the heap
  238 + * buffer. When checksums are enabled and we're not in recovery, we must add
  239 + * the heap buffer to the WAL chain to protect it from being torn.
  240 + *
236 241 * You must pass a buffer containing the correct map page to this function.
237 242 * Call visibilitymap_pin first to pin the right one. This function doesn't do
238 243 * any I/O.
239 244 */
240 245 void
241   -visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
242   - Buffer buf, TransactionId cutoff_xid)
  246 +visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
  247 + XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid)
243 248 {
244 249 BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
245 250 uint32 mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
@@ -252,34 +257,55 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
252 257 #endif
253 258
254 259 Assert(InRecovery || XLogRecPtrIsInvalid(recptr));
  260 + Assert(InRecovery || BufferIsValid(heapBuf));
255 261
256   - /* Check that we have the right page pinned */
257   - if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
258   - elog(ERROR, "wrong buffer passed to visibilitymap_set");
  262 + /* Check that we have the right heap page pinned, if present */
  263 + if (BufferIsValid(heapBuf) && BufferGetBlockNumber(heapBuf) != heapBlk)
  264 + elog(ERROR, "wrong heap buffer passed to visibilitymap_set");
259 265
260   - page = BufferGetPage(buf);
  266 + /* Check that we have the right VM page pinned */
  267 + if (!BufferIsValid(vmBuf) || BufferGetBlockNumber(vmBuf) != mapBlock)
  268 + elog(ERROR, "wrong VM buffer passed to visibilitymap_set");
  269 +
  270 + page = BufferGetPage(vmBuf);
261 271 map = PageGetContents(page);
262   - LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
  272 + LockBuffer(vmBuf, BUFFER_LOCK_EXCLUSIVE);
263 273
264 274 if (!(map[mapByte] & (1 << mapBit)))
265 275 {
266 276 START_CRIT_SECTION();
267 277
268 278 map[mapByte] |= (1 << mapBit);
269   - MarkBufferDirty(buf);
  279 + MarkBufferDirty(vmBuf);
270 280
271 281 if (RelationNeedsWAL(rel))
272 282 {
273 283 if (XLogRecPtrIsInvalid(recptr))
274   - recptr = log_heap_visible(rel->rd_node, heapBlk, buf,
  284 + {
  285 + Assert(!InRecovery);
  286 + recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
275 287 cutoff_xid);
  288 +
  289 + /*
  290 + * If data checksums are enabled, we need to protect the heap
  291 + * page from being torn.
  292 + */
  293 + if (DataChecksumsEnabled())
  294 + {
  295 + Page heapPage = BufferGetPage(heapBuf);
  296 +
  297 + /* caller is expected to set PD_ALL_VISIBLE first */
  298 + Assert(PageIsAllVisible(heapPage));
  299 + PageSetLSN(heapPage, recptr);
  300 + }
  301 + }
276 302 PageSetLSN(page, recptr);
277 303 }
278 304
279 305 END_CRIT_SECTION();
280 306 }
281 307
282   - LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  308 + LockBuffer(vmBuf, BUFFER_LOCK_UNLOCK);
283 309 }
284 310
285 311 /*
@@ -579,6 +605,8 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
579 605 /* Now extend the file */
580 606 while (vm_nblocks_now < vm_nblocks)
581 607 {
  608 + PageSetChecksumInplace(pg, vm_nblocks_now);
  609 +
582 610 smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
583 611 (char *) pg, false);
584 612 vm_nblocks_now++;
10 src/backend/access/nbtree/nbtinsert.c
@@ -407,11 +407,15 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
407 407 */
408 408 ItemIdMarkDead(curitemid);
409 409 opaque->btpo_flags |= BTP_HAS_GARBAGE;
410   - /* be sure to mark the proper buffer dirty... */
  410 +
  411 + /*
  412 + * Mark buffer with a dirty hint, since state is not
  413 + * crucial. Be sure to mark the proper buffer dirty.
  414 + */
411 415 if (nbuf != InvalidBuffer)
412   - SetBufferCommitInfoNeedsSave(nbuf);
  416 + MarkBufferDirtyHint(nbuf);
413 417 else
414   - SetBufferCommitInfoNeedsSave(buf);
  418 + MarkBufferDirtyHint(buf);
415 419 }
416 420 }
417 421 }
3  src/backend/access/nbtree/nbtree.c
@@ -217,6 +217,7 @@ btbuildempty(PG_FUNCTION_ARGS)
217 217 _bt_initmetapage(metapage, P_NONE, 0);
218 218
219 219 /* Write the page. If archiving/streaming, XLOG it. */
  220 + PageSetChecksumInplace(metapage, BTREE_METAPAGE);
220 221 smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
221 222 (char *) metapage, true);
222 223 if (XLogIsNeeded())
@@ -1051,7 +1052,7 @@ btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
1051 1052 opaque->btpo_cycleid == vstate->cycleid)
1052 1053 {
1053 1054 opaque->btpo_cycleid = 0;
1054   - SetBufferCommitInfoNeedsSave(buf);
  1055 + MarkBufferDirtyHint(buf);
1055 1056 }
1056 1057 }
1057 1058
3  src/backend/access/nbtree/nbtsort.c
@@ -288,12 +288,15 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
288 288 {
289 289 if (!wstate->btws_zeropage)
290 290 wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
  291 + /* don't set checksum for all-zero page */
291 292 smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM,
292 293 wstate->btws_pages_written++,
293 294 (char *) wstate->btws_zeropage,
294 295 true);
295 296 }
296 297
  298 + PageSetChecksumInplace(page, blkno);
  299 +
297 300 /*
298 301 * Now write the page. There's no need for smgr to schedule an fsync for
299 302 * this write; we'll do it ourselves before ending the build.
6 src/backend/access/nbtree/nbtutils.c
@@ -1781,9 +1781,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
1781 1781 }
1782 1782
1783 1783 /*
1784   - * Since this can be redone later if needed, it's treated the same as a
1785   - * commit-hint-bit status update for heap tuples: we mark the buffer dirty
1786   - * but don't make a WAL log entry.
  1784 + * Since this can be redone later if needed, mark as dirty hint.
1787 1785 *
1788 1786 * Whenever we mark anything LP_DEAD, we also set the page's
1789 1787 * BTP_HAS_GARBAGE flag, which is likewise just a hint.
@@ -1791,7 +1789,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
1791 1789 if (killedsomething)
1792 1790 {
1793 1791 opaque->btpo_flags |= BTP_HAS_GARBAGE;
1794   - SetBufferCommitInfoNeedsSave(so->currPos.buf);
  1792 + MarkBufferDirtyHint(so->currPos.buf);
1795 1793 }
1796 1794
1797 1795 if (!haveLock)
4 src/backend/access/rmgrdesc/xlogdesc.c
@@ -81,6 +81,10 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
81 81 appendStringInfo(buf, "restore point: %s", xlrec->rp_name);
82 82
83 83 }
  84 + else if (info == XLOG_HINT)
  85 + {
  86 + appendStringInfo(buf, "page hint");
  87 + }
84 88 else if (info == XLOG_BACKUP_END)
85 89 {
86 90 XLogRecPtr startpoint;
3  src/backend/access/spgist/spginsert.c
@@ -154,6 +154,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
154 154 SpGistInitMetapage(page);
155 155
156 156 /* Write the page. If archiving/streaming, XLOG it. */
  157 + PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
157 158 smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
158 159 (char *) page, true);
159 160 if (XLogIsNeeded())
@@ -163,6 +164,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
163 164 /* Likewise for the root page. */
164 165 SpGistInitPage(page, SPGIST_LEAF);
165 166
  167 + PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
166 168 smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
167 169 (char *) page, true);
168 170 if (XLogIsNeeded())
@@ -172,6 +174,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
172 174 /* Likewise for the null-tuples root page. */
173 175 SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
174 176
  177 + PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
175 178 smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
176 179 (char *) page, true);
177 180 if (XLogIsNeeded())
22 src/backend/access/transam/README
@@ -437,6 +437,8 @@ critical section.)
437 437
438 438 4. Mark the shared buffer(s) as dirty with MarkBufferDirty(). (This must
439 439 happen before the WAL record is inserted; see notes in SyncOneBuffer().)
  440 +Note that marking a buffer dirty with MarkBufferDirty() should only
  441 +happen iff you write a WAL record; see Writing Hints below.
440 442
441 443 5. If the relation requires WAL-logging, build a WAL log record and pass it
442 444 to XLogInsert(); then update the page's LSN using the returned XLOG
@@ -584,6 +586,26 @@ replay code has to do the insertion on its own to restore the index to
584 586 consistency. Such insertions occur after WAL is operational, so they can
585 587 and should write WAL records for the additional generated actions.
586 588
  589 +Writing Hints
  590 +-------------
  591 +
  592 +In some cases, we write additional information to data blocks without
  593 +writing a preceding WAL record. This should only happen iff the data can
  594 +be reconstructed later following a crash and the action is simply a way
  595 +of optimising for performance. When a hint is written we use
  596 +MarkBufferDirtyHint() to mark the block dirty.
  597 +
  598 +If the buffer is clean and checksums are in use then
  599 +MarkBufferDirtyHint() inserts an XLOG_HINT record to ensure that we
  600 +take a full page image that includes the hint. We do this to avoid
  601 +a partial page write, when we write the dirtied page. WAL is not
  602 +written during recovery, so we simply skip dirtying blocks because
  603 +of hints when in recovery.
  604 +
  605 +If you do decide to optimise away a WAL record, then any calls to
  606 +MarkBufferDirty() must be replaced by MarkBufferDirtyHint(),
  607 +otherwise you will expose the risk of partial page writes.
  608 +
587 609
588 610 Write-Ahead Logging for Filesystem Actions
589 611 ------------------------------------------
111 src/backend/access/transam/xlog.c
@@ -60,6 +60,7 @@
60 60 #include "utils/timestamp.h"
61 61 #include "pg_trace.h"
62 62
  63 +extern bool bootstrap_data_checksums;
63 64
64 65 /* File path names (all relative to $PGDATA) */
65 66 #define RECOVERY_COMMAND_FILE "recovery.conf"
@@ -730,6 +731,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
730 731 bool updrqst;
731 732 bool doPageWrites;
732 733 bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
  734 + bool isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT);
733 735 uint8 info_orig = info;
734 736 static XLogRecord *rechdr;
735 737
@@ -1000,6 +1002,18 @@ begin:;
1000 1002 }
1001 1003
1002 1004 /*
  1005 + * If this is a hint record and we don't need a backup block then
  1006 + * we have no more work to do and can exit quickly without inserting
  1007 + * a WAL record at all. In that case return InvalidXLogRecPtr.
  1008 + */
  1009 + if (isHint && !(info & XLR_BKP_BLOCK_MASK))
  1010 + {
  1011 + LWLockRelease(WALInsertLock);
  1012 + END_CRIT_SECTION();
  1013 + return InvalidXLogRecPtr;
  1014 + }
  1015 +
  1016 + /*
1003 1017 * If the current page is completely full, the record goes to the next
1004 1018 * page, right after the page header.
1005 1019 */
@@ -1253,10 +1267,10 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1253 1267 * not. We don't need the buffer header lock for PageGetLSN because we
1254 1268 * have exclusive lock on the page and/or the relation.
1255 1269 */
1256   - *lsn = PageGetLSN(page);
  1270 + *lsn = BufferGetLSNAtomic(rdata->buffer);
1257 1271
1258 1272 if (doPageWrites &&
1259   - PageGetLSN(page) <= RedoRecPtr)
  1273 + *lsn <= RedoRecPtr)
1260 1274 {
1261 1275 /*
1262 1276 * The page needs to be backed up, so set up *bkpb
@@ -3187,6 +3201,11 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
3187 3201 BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
3188 3202 }
3189 3203
  3204 + /*
  3205 + * Any checksum set on this page will be invalid. We don't need
  3206 + * to reset it here since it will be set before being written.
  3207 + */
  3208 +
3190 3209 PageSetLSN(page, lsn);
3191 3210 MarkBufferDirty(buffer);
3192 3211
@@ -3767,6 +3786,16 @@ GetSystemIdentifier(void)
3767 3786 }
3768 3787
3769 3788 /*
  3789 + * Are checksums enabled for data pages?
  3790 + */
  3791 +bool
  3792 +DataChecksumsEnabled(void)
  3793 +{
  3794 + Assert(ControlFile != NULL);
  3795 + return ControlFile->data_checksums;
  3796 +}
  3797 +
  3798 +/*
3770 3799 * Returns a fake LSN for unlogged relations.
3771 3800 *
3772 3801 * Each call generates an LSN that is greater than any previous value
@@ -4092,6 +4121,7 @@ BootStrapXLOG(void)
4092 4121 ControlFile->max_prepared_xacts = max_prepared_xacts;
4093 4122 ControlFile->max_locks_per_xact = max_locks_per_xact;
4094 4123 ControlFile->wal_level = wal_level;
  4124 + ControlFile->data_checksums = bootstrap_data_checksums;
4095 4125
4096 4126 /* some additional ControlFile fields are set in WriteControlFile() */
4097 4127
@@ -7602,6 +7632,51 @@ XLogRestorePoint(const char *rpName)
7602 7632 }
7603 7633
7604 7634 /*
  7635 + * Write a backup block if needed when we are setting a hint. Note that
  7636 + * this may be called for a variety of page types, not just heaps.
  7637 + *
  7638 + * Deciding the "if needed" part is delicate and requires us to either
  7639 + * grab WALInsertLock or check the info_lck spinlock. If we check the
  7640 + * spinlock and it says Yes then we will need to get WALInsertLock as well,
  7641 + * so the design choice here is to just go straight for the WALInsertLock
  7642 + * and trust that calls to this function are minimised elsewhere.
  7643 + *
  7644 + * Callable while holding just share lock on the buffer content.
  7645 + *
  7646 + * Possible that multiple concurrent backends could attempt to write
  7647 + * WAL records. In that case, more than one backup block may be recorded
  7648 + * though that isn't important to the outcome and the backup blocks are
  7649 + * likely to be identical anyway.
  7650 + */
  7651 +#define XLOG_HINT_WATERMARK 13579
  7652 +XLogRecPtr
  7653 +XLogSaveBufferForHint(Buffer buffer)
  7654 +{
  7655 + /*
  7656 + * Make an XLOG entry reporting the hint
  7657 + */
  7658 + XLogRecData rdata[2];
  7659 + int watermark = XLOG_HINT_WATERMARK;
  7660 +
  7661 + /*
  7662 + * Not allowed to have zero-length records, so use a small watermark
  7663 + */
  7664 + rdata[0].data = (char *) (&watermark);
  7665 + rdata[0].len = sizeof(int);
  7666 + rdata[0].buffer = InvalidBuffer;
  7667 + rdata[0].buffer_std = false;
  7668 + rdata[0].next = &(rdata[1]);
  7669 +
  7670 + rdata[1].data = NULL;
  7671 + rdata[1].len = 0;
  7672 + rdata[1].buffer = buffer;
  7673 + rdata[1].buffer_std = true;
  7674 + rdata[1].next = NULL;
  7675 +
  7676 + return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
  7677 +}
  7678 +
  7679 +/*
7605 7680 * Check if any of the GUC parameters that are critical for hot standby
7606 7681 * have changed, and update the value in pg_control file if necessary.
7607 7682 */
@@ -7767,8 +7842,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
7767 7842 {
7768 7843 uint8 info = record->xl_info & ~XLR_INFO_MASK;
7769 7844
7770   - /* Backup blocks are not used in xlog records */
7771   - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
  7845 + /* Backup blocks are not used in most xlog records */
  7846 + Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
7772 7847
7773 7848 if (info == XLOG_NEXTOID)
7774 7849 {
@@ -7961,6 +8036,34 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
7961 8036 {
7962 8037 /* nothing to do here */
7963 8038 }
  8039 + else if (info == XLOG_HINT)
  8040 + {
  8041 +#ifdef USE_ASSERT_CHECKING
  8042 + int *watermark = (int *) XLogRecGetData(record);
  8043 +#endif
  8044 +
  8045 + /* Check the watermark is correct for the hint record */
  8046 + Assert(*watermark == XLOG_HINT_WATERMARK);
  8047 +
  8048 + /* Backup blocks must be present for smgr hint records */
  8049 + Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
  8050 +
  8051 + /*
  8052 + * Hint records have no information that needs to be replayed.
  8053 + * The sole purpose of them is to ensure that a hint bit does
  8054 + * not cause a checksum invalidation if a hint bit write should
  8055 + * cause a torn page. So the body of the record is empty but
  8056 + * there must be one backup block.
  8057 + *
  8058 + * Since the only change in the backup block is a hint bit,
  8059 + * there is no confict with Hot Standby.
  8060 + *
  8061 + * This also means there is no corresponding API call for this,
  8062 + * so an smgr implementation has no need to implement anything.
  8063 + * Which means nothing is needed in md.c etc
  8064 + */
  8065 + RestoreBackupBlock(lsn, record, 0, false, false);
  8066 + }
7964 8067 else if (info == XLOG_BACKUP_END)
7965 8068 {
7966 8069 XLogRecPtr startpoint;
7 src/backend/bootstrap/bootstrap.c
@@ -48,6 +48,8 @@
48 48 extern int optind;
49 49 extern char *optarg;
50 50
  51 +bool bootstrap_data_checksums = false;
  52 +
51 53
52 54 #define ALLOC(t, c) ((t *) calloc((unsigned)(c), sizeof(t)))
53 55
@@ -233,7 +235,7 @@ AuxiliaryProcessMain(int argc, char *argv[])
233 235 /* If no -x argument, we are a CheckerProcess */
234 236 MyAuxProcType = CheckerProcess;
235 237
236   - while ((flag = getopt(argc, argv, "B:c:d:D:Fr:x:-:")) != -1)
  238 + while ((flag = getopt(argc, argv, "B:c:d:D:Fkr:x:-:")) != -1)
237 239 {
238 240 switch (flag)
239 241 {
@@ -259,6 +261,9 @@ AuxiliaryProcessMain(int argc, char *argv[])
259 261 case 'F':
260 262 SetConfigOption("fsync", "false", PGC_POSTMASTER, PGC_S_ARGV);
261 263 break;
  264 + case 'k':
  265 + bootstrap_data_checksums = true;
  266 + break;
262 267 case 'r':
263 268 strlcpy(OutputFileName, optarg, MAXPGPATH);
264 269 break;
2  src/backend/commands/matview.c
@@ -76,6 +76,8 @@ SetRelationIsScannable(Relation relation)
76 76 log_newpage(&(relation->rd_node), MAIN_FORKNUM, 0, page);
77 77
78 78 RelationOpenSmgr(relation);
  79 +
  80 + PageSetChecksumInplace(page, 0);
79 81 smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
80 82
81 83 pfree(page);
2  src/backend/commands/sequence.c
@@ -1118,7 +1118,7 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
1118 1118 HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
1119 1119 seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
1120 1120 seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
1121   - SetBufferCommitInfoNeedsSave(*buf);
  1121 + MarkBufferDirtyHint(*buf);
1122 1122 }
1123 1123
1124 1124 seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
2  src/backend/commands/tablecmds.c
@@ -8902,6 +8902,8 @@ copy_relation_data(SMgrRelation src, SMgrRelation dst,
8902 8902
8903 8903 smgrread(src, forkNum, blkno, buf);
8904 8904
  8905 + PageSetChecksumInplace(page, blkno);
  8906 +
8905 8907 /* XLOG stuff */
8906 8908 if (use_wal)
8907 8909 log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page);
14 src/backend/commands/vacuumlazy.c
@@ -672,8 +672,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
672 672 {
673 673 PageSetAllVisible(page);
674 674 MarkBufferDirty(buf);
675   - visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
676   - InvalidTransactionId);
  675 + visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
  676 + vmbuffer, InvalidTransactionId);
677 677 }
678 678
679 679 UnlockReleaseBuffer(buf);
@@ -907,8 +907,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
907 907 {
908 908 PageSetAllVisible(page);
909 909 MarkBufferDirty(buf);
910   - visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
911   - visibility_cutoff_xid);
  910 + visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
  911 + vmbuffer, visibility_cutoff_xid);
912 912 }
913 913 else if (!all_visible_according_to_vm)
914 914 {
@@ -918,8 +918,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
918 918 * allowed. Set the visibility map bit as well so that we get
919 919 * back in sync.
920 920 */
921   - visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, vmbuffer,
922   - visibility_cutoff_xid);
  921 + visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
  922 + vmbuffer, visibility_cutoff_xid);
923 923 }
924 924 }
925 925
@@ -1154,7 +1154,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
1154 1154 {
1155 1155 Assert(BufferIsValid(*vmbuffer));
1156 1156 PageSetAllVisible(page);
1157   - visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, *vmbuffer,
  1157 + visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr, *vmbuffer,
1158 1158 visibility_cutoff_xid);
1159 1159 }
1160 1160
214 src/backend/storage/buffer/bufmgr.c
@@ -34,6 +34,7 @@
34 34 #include <unistd.h>
35 35
36 36 #include "catalog/catalog.h"
  37 +#include "catalog/storage.h"
37 38 #include "common/relpath.h"
38 39 #include "executor/instrument.h"
39 40 #include "miscadmin.h"
@@ -431,6 +432,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
431 432 {
432 433 /* new buffers are zero-filled */
433 434 MemSet((char *) bufBlock, 0, BLCKSZ);
  435 + /* don't set checksum for all-zero page */
434 436 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
435 437 }
436 438 else
@@ -460,13 +462,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
460 462 }
461 463
462 464 /* check for garbage data */
463   - if (!PageHeaderIsValid((PageHeader) bufBlock))
  465 + if (!PageIsVerified((Page) bufBlock, blockNum))
464 466 {
465 467 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
466 468 {
467 469 ereport(WARNING,
468 470 (errcode(ERRCODE_DATA_CORRUPTED),
469   - errmsg("invalid page header in block %u of relation %s; zeroing out page",
  471 + errmsg("invalid page in block %u of relation %s; zeroing out page",
470 472 blockNum,
471 473 relpath(smgr->smgr_rnode, forkNum))));
472 474 MemSet((char *) bufBlock, 0, BLCKSZ);
@@ -474,7 +476,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
474 476 else
475 477 ereport(ERROR,
476 478 (errcode(ERRCODE_DATA_CORRUPTED),
477   - errmsg("invalid page header in block %u of relation %s",
  479 + errmsg("invalid page in block %u of relation %s",
478 480 blockNum,
479 481 relpath(smgr->smgr_rnode, forkNum))));
480 482 }
@@ -655,14 +657,23 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
655 657 * victim. We need lock to inspect the page LSN, so this
656 658 * can't be done inside StrategyGetBuffer.
657 659 */
658   - if (strategy != NULL &&
659   - XLogNeedsFlush(BufferGetLSN(buf)) &&
660   - StrategyRejectBuffer(strategy, buf))
  660 + if (strategy != NULL)
661 661 {
662   - /* Drop lock/pin and loop around for another buffer */
663   - LWLockRelease(buf->content_lock);
664   - UnpinBuffer(buf, true);
665   - continue;
  662 + XLogRecPtr lsn;
  663 +
  664 + /* Read the LSN while holding buffer header lock */
  665 + LockBufHdr(buf);
  666 + lsn = BufferGetLSN(buf);
  667 + UnlockBufHdr(buf);
  668 +
  669 + if (XLogNeedsFlush(lsn) &&
  670 + StrategyRejectBuffer(strategy, buf))
  671 + {
  672 + /* Drop lock/pin and loop around for another buffer */
  673 + LWLockRelease(buf->content_lock);
  674 + UnpinBuffer(buf, true);
  675 + continue;
  676 + }
666 677 }
667 678
668 679 /* OK, do the I/O */
@@ -1906,6 +1917,8 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
1906 1917 ErrorContextCallback errcallback;
1907 1918 instr_time io_start,
1908 1919 io_time;
  1920 + Block bufBlock;
  1921 + char *bufToWrite;
1909 1922
1910 1923 /*
1911 1924 * Acquire the buffer's io_in_progress lock. If StartBufferIO returns
@@ -1931,6 +1944,18 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
1931 1944 reln->smgr_rnode.node.dbNode,
1932 1945 reln->smgr_rnode.node.relNode);
1933 1946
  1947 + LockBufHdr(buf);
  1948 +
  1949 + /*
  1950 + * Run PageGetLSN while holding header lock, since we don't have the
  1951 + * buffer locked exclusively in all cases.
  1952 + */
  1953 + recptr = BufferGetLSN(buf);
  1954 +
  1955 + /* To check if block content changes while flushing. - vadim 01/17/97 */
  1956 + buf->flags &= ~BM_JUST_DIRTIED;
  1957 + UnlockBufHdr(buf);
  1958 +
1934 1959 /*
1935 1960 * Force XLOG flush up to buffer's LSN. This implements the basic WAL
1936 1961 * rule that log updates must hit disk before any of the data-file changes
@@ -1949,10 +1974,7 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
1949 1974 * buffer isn't permanent.
1950 1975 */
1951 1976 if (buf->flags & BM_PERMANENT)
1952   - {
1953   - recptr = BufferGetLSN(buf);
1954 1977 XLogFlush(recptr);
1955   - }
1956 1978
1957 1979 /*
1958 1980 * Now it's safe to write buffer to disk. Note that no one else should
@@ -1960,18 +1982,20 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
1960 1982 * we have the io_in_progress lock.
1961 1983 */
1962 1984
1963   - /* To check if block content changes while flushing. - vadim 01/17/97 */
1964   - LockBufHdr(buf);
1965   - buf->flags &= ~BM_JUST_DIRTIED;
1966   - UnlockBufHdr(buf);
  1985 + bufBlock = BufHdrGetBlock(buf);
  1986 +
  1987 + bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
1967 1988
1968 1989 if (track_io_timing)
1969 1990 INSTR_TIME_SET_CURRENT(io_start);
1970 1991
  1992 + /*
  1993 + * bufToWrite is either the shared buffer or a copy, as appropriate.
  1994 + */
1971 1995 smgrwrite(reln,
1972 1996 buf->tag.forkNum,
1973 1997 buf->tag.blockNum,
1974   - (char *) BufHdrGetBlock(buf),
  1998 + bufToWrite,
1975 1999 false);
1976 2000
1977 2001 if (track_io_timing)
@@ -2042,6 +2066,34 @@ BufferIsPermanent(Buffer buffer)
2042 2066 return (bufHdr->flags & BM_PERMANENT) != 0;
2043 2067 }
2044 2068
  2069 +/*
  2070 + * BufferGetLSNAtomic
  2071 + * Retrieves the LSN of the buffer atomically using a buffer header lock.
  2072 + * This is necessary for some callers who may not have an exclusive lock
  2073 + * on the buffer.
  2074 + */
  2075 +XLogRecPtr
  2076 +BufferGetLSNAtomic(Buffer buffer)
  2077 +{
  2078 + volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
  2079 + char *page = BufferGetPage(buffer);
  2080 + XLogRecPtr lsn;
  2081 +
  2082 + /* Local buffers don't need a lock. */
  2083 + if (BufferIsLocal(buffer))
  2084 + return PageGetLSN(page);
  2085 +
  2086 + /* Make sure we've got a real buffer, and that we hold a pin on it. */
  2087 + Assert(BufferIsValid(buffer));
  2088 + Assert(BufferIsPinned(buffer));
  2089 +
  2090 + LockBufHdr(bufHdr);
  2091 + lsn = PageGetLSN(page);
  2092 + UnlockBufHdr(bufHdr);
  2093 +
  2094 + return lsn;
  2095 +}
  2096 +
2045 2097 /* ---------------------------------------------------------------------
2046 2098 * DropRelFileNodeBuffers
2047 2099 *
@@ -2343,7 +2395,10 @@ FlushRelationBuffers(Relation rel)
2343 2395 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
2344 2396 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
2345 2397 {
2346   - ErrorContextCallback errcallback;
  2398 + ErrorContextCallback errcallback;
  2399 + Page localpage;
  2400 +
  2401 + localpage = (char *) LocalBufHdrGetBlock(bufHdr);
2347 2402
2348 2403 /* Setup error traceback support for ereport() */
2349 2404 errcallback.callback = local_buffer_write_error_callback;
@@ -2351,10 +2406,12 @@ FlushRelationBuffers(Relation rel)
2351 2406 errcallback.previous = error_context_stack;
2352 2407 error_context_stack = &errcallback;
2353 2408
  2409 + PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
  2410 +
2354 2411 smgrwrite(rel->rd_smgr,
2355 2412 bufHdr->tag.forkNum,
2356 2413 bufHdr->tag.blockNum,
2357   - (char *) LocalBufHdrGetBlock(bufHdr),
  2414 + localpage,
2358 2415 false);
2359 2416
2360 2417 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
@@ -2509,22 +2566,24 @@ IncrBufferRefCount(Buffer buffer)
2509 2566 }
2510 2567
2511 2568 /*
2512   - * SetBufferCommitInfoNeedsSave
  2569 + * MarkBufferDirtyHint
2513 2570 *
2514   - * Mark a buffer dirty when we have updated tuple commit-status bits in it.
  2571 + * Mark a buffer dirty for non-critical changes.
2515 2572 *
2516   - * This is essentially the same as MarkBufferDirty, except that the caller
2517   - * might have only share-lock instead of exclusive-lock on the buffer's
2518   - * content lock. We preserve the distinction mainly as a way of documenting
2519   - * that the caller has not made a critical data change --- the status-bit
2520   - * update could be redone by someone else just as easily. Therefore, no WAL
2521   - * log record need be generated, whereas calls to MarkBufferDirty really ought
2522   - * to be associated with a WAL-entry-creating action.
  2573 + * This is essentially the same as MarkBufferDirty, except:
  2574 + *
  2575 + * 1. The caller does not write WAL; so if checksums are enabled, we may need
  2576