Skip to content

Migration: Re-use async batch infrastructure #1343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 7 additions & 78 deletions includes/class-migration.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ class Migration {
* Initialize the class, registering WordPress hooks.
*/
public static function init() {
\add_action( 'activitypub_migrate', array( self::class, 'async_migration' ) );
\add_action( 'activitypub_upgrade', array( self::class, 'async_upgrade' ), 10, 99 );
\add_action( 'activitypub_update_comment_counts', array( self::class, 'update_comment_counts' ), 10, 2 );

self::maybe_migrate();
}

Expand Down Expand Up @@ -121,13 +117,12 @@ public static function maybe_migrate() {
$version_from_db = ACTIVITYPUB_PLUGIN_VERSION;
}

// Schedule the async migration.
if ( ! \wp_next_scheduled( 'activitypub_migrate', $version_from_db ) ) {
\wp_schedule_single_event( \time(), 'activitypub_migrate', array( $version_from_db ) );
}
if ( \version_compare( $version_from_db, '0.17.0', '<' ) ) {
self::migrate_from_0_16();
}
if ( \version_compare( $version_from_db, '1.0.0', '<' ) ) {
\wp_schedule_single_event( \time(), 'activitypub_migrate_from_0_17' );
}
if ( \version_compare( $version_from_db, '1.3.0', '<' ) ) {
self::migrate_from_1_2_0();
}
Expand Down Expand Up @@ -160,8 +155,8 @@ public static function maybe_migrate() {
}
if ( \version_compare( $version_from_db, '5.0.0', '<' ) ) {
Scheduler::register_schedules();
\wp_schedule_single_event( \time(), 'activitypub_upgrade', array( 'create_post_outbox_items' ) );
\wp_schedule_single_event( \time() + 15, 'activitypub_upgrade', array( 'create_comment_outbox_items' ) );
\wp_schedule_single_event( \time(), 'activitypub_create_post_outbox_items' );
\wp_schedule_single_event( \time() + 15, 'activitypub_create_comment_outbox_items' );
add_action( 'init', 'flush_rewrite_rules', 20 );
}
if ( \version_compare( $version_from_db, '5.2.0', '<' ) ) {
Expand Down Expand Up @@ -204,49 +199,6 @@ public static function maybe_migrate() {
self::unlock();
}

/**
* Asynchronously migrates the database structure.
*
* @param string $version_from_db The version from which to migrate.
*/
public static function async_migration( $version_from_db ) {
if ( \version_compare( $version_from_db, '1.0.0', '<' ) ) {
self::migrate_from_0_17();
}
}

/**
* Asynchronously runs upgrade routines.
*
* @param callable $callback Callable upgrade routine. Must be a method of this class.
* @params mixed ...$args Optional. Parameters that get passed to the callback.
*/
public static function async_upgrade( $callback ) {
$args = \func_get_args();

// Bail if the existing lock is still valid.
if ( self::is_locked() ) {
\wp_schedule_single_event( time() + MINUTE_IN_SECONDS, 'activitypub_upgrade', $args );
return;
}

self::lock();

$callback = array_shift( $args ); // Remove $callback from arguments.
$next = \call_user_func_array( array( self::class, $callback ), $args );

self::unlock();

if ( ! empty( $next ) ) {
// Schedule the next run, adding the result to the arguments.
\wp_schedule_single_event(
\time() + 30,
'activitypub_upgrade',
\array_merge( array( $callback ), \array_values( $next ) )
);
}
}

/**
* Updates the custom template to use shortcodes instead of the deprecated templates.
*/
Expand Down Expand Up @@ -488,25 +440,11 @@ public static function migrate_to_4_7_2() {
* @see Comment::pre_wp_update_comment_count_now()
* @param int $batch_size Optional. Number of posts to process per batch. Default 100.
* @param int $offset Optional. Number of posts to skip. Default 0.
* @return int[]|void Array with batch size and offset if there are more posts to process.
*/
public static function update_comment_counts( $batch_size = 100, $offset = 0 ) {
global $wpdb;

// Bail if the existing lock is still valid.
if ( self::is_locked() ) {
\wp_schedule_single_event(
time() + ( 5 * MINUTE_IN_SECONDS ),
'activitypub_update_comment_counts',
array(
'batch_size' => $batch_size,
'offset' => $offset,
)
);
return;
}

self::lock();

Comment::register_comment_types();
$comment_types = Comment::get_comment_type_slugs();
$type_inclusion = "AND comment_type IN ('" . implode( "','", $comment_types ) . "')";
Expand All @@ -527,17 +465,8 @@ public static function update_comment_counts( $batch_size = 100, $offset = 0 ) {

if ( count( $post_ids ) === $batch_size ) {
// Schedule next batch.
\wp_schedule_single_event(
time() + MINUTE_IN_SECONDS,
'activitypub_update_comment_counts',
array(
'batch_size' => $batch_size,
'offset' => $offset + $batch_size,
)
);
return array( $batch_size, $offset + $batch_size );
}

self::unlock();
}

/**
Expand Down
14 changes: 11 additions & 3 deletions includes/class-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ public static function init() {
self::register_schedulers();

self::$batch_callbacks = array(
'activitypub_send_activity' => array( Dispatcher::class, 'send_to_followers' ),
'activitypub_retry_activity' => array( Dispatcher::class, 'retry_send_to_followers' ),
'activitypub_send_activity' => array( Dispatcher::class, 'send_to_followers' ),
'activitypub_retry_activity' => array( Dispatcher::class, 'retry_send_to_followers' ),
'activitypub_migrate_from_0_17' => array( Migration::class, 'migrate_from_0_17' ),
'activitypub_update_comment_counts' => array( Migration::class, 'update_comment_counts' ),
'activitypub_create_post_outbox_items' => array( Migration::class, 'create_post_outbox_items' ),
'activitypub_create_comment_outbox_items' => array( Migration::class, 'create_comment_outbox_items' ),
);

// Follower Cleanups.
Expand All @@ -49,6 +53,10 @@ public static function init() {
\add_action( 'activitypub_async_batch', array( self::class, 'async_batch' ), 10, 99 );
\add_action( 'activitypub_send_activity', array( self::class, 'async_batch' ), 10, 3 );
\add_action( 'activitypub_retry_activity', array( self::class, 'async_batch' ), 10, 3 );
\add_action( 'activitypub_migrate_from_0_17', array( self::class, 'async_batch' ) );
\add_action( 'activitypub_update_comment_counts', array( self::class, 'async_batch' ), 10, 2 );
\add_action( 'activitypub_create_post_outbox_items', array( self::class, 'async_batch' ), 10, 2 );
\add_action( 'activitypub_create_comment_outbox_items', array( self::class, 'async_batch' ), 10, 2 );
\add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) );
\add_action( 'activitypub_outbox_purge', array( self::class, 'purge_outbox' ) );

Expand Down Expand Up @@ -326,7 +334,7 @@ public static function async_batch() {
return;
}

$key = \md5( \serialize( $args[0] ?? $args ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
$key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize

// Bail if the existing lock is still valid.
if ( self::is_locked( $key ) ) {
Expand Down
93 changes: 7 additions & 86 deletions tests/includes/class-test-migration.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

namespace Activitypub\Tests;

use Activitypub\Collection\Extra_Fields;
use Activitypub\Collection\Followers;
use Activitypub\Collection\Outbox;
use Activitypub\Migration;
use Activitypub\Comment;
use Activitypub\Migration;
use Activitypub\Model\Follower;
use Activitypub\Collection\Extra_Fields;
use Activitypub\Scheduler;

/**
* Test class for Activitypub Migrate.
Expand Down Expand Up @@ -166,23 +167,6 @@ public function test_migrate_actor_mode() {
$this->assertEquals( ACTIVITYPUB_ACTOR_MODE, \get_option( 'activitypub_actor_mode', ACTIVITYPUB_ACTOR_MODE ) );
}

/**
* Tests scheduling of migration.
*
* @covers ::maybe_migrate
*/
public function test_migration_scheduling() {
update_option( 'activitypub_db_version', '0.0.1' );

Migration::maybe_migrate();

$schedule = \wp_next_scheduled( 'activitypub_migrate', array( '0.0.1' ) );
$this->assertNotFalse( $schedule );

// Clean up.
delete_option( 'activitypub_db_version' );
}

/**
* Test migrate to 4.1.0.
*
Expand Down Expand Up @@ -395,12 +379,12 @@ public function test_update_comment_counts_with_lock() {
Comment::register_comment_types();

// Create test comments.
$post_id = $this->factory->post->create(
$post_id = self::factory()->post->create(
array(
'post_author' => 1,
)
);
$comment_id = $this->factory->comment->create(
$comment_id = self::factory()->comment->create(
array(
'comment_post_ID' => $post_id,
'comment_approved' => '1',
Expand All @@ -418,41 +402,6 @@ public function test_update_comment_counts_with_lock() {
wp_delete_post( $post_id, true );
}

/**
* Test update_comment_counts() with existing valid lock.
*
* @covers ::update_comment_counts
*/
public function test_update_comment_counts_with_existing_valid_lock() {
// Register comment types.
Comment::register_comment_types();

// Set a lock.
Migration::lock();

Migration::update_comment_counts( 10, 0 );

// Verify a scheduled event was created.
$next_scheduled = wp_next_scheduled(
'activitypub_update_comment_counts',
array(
'batch_size' => 10,
'offset' => 0,
)
);
$this->assertNotFalse( $next_scheduled );

// Clean up.
delete_option( 'activitypub_migration_lock' );
wp_clear_scheduled_hook(
'activitypub_update_comment_counts',
array(
'batch_size' => 10,
'offset' => 0,
)
);
}

/**
* Test create post outbox items.
*
Expand Down Expand Up @@ -524,43 +473,15 @@ public function test_create_outbox_items_batching() {
$this->assertEquals( 5, count( $outbox_items ) );
}

/**
* Test async upgrade functionality.
*
* @covers ::async_upgrade
* @covers ::lock
* @covers ::unlock
* @covers ::create_post_outbox_items
*/
public function test_async_upgrade() {
// Test that lock prevents simultaneous upgrades.
Migration::lock();
Migration::async_upgrade( 'create_post_outbox_items' );
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items' ) );
$this->assertNotFalse( $scheduled );
Migration::unlock();

// Test scheduling next batch when callback returns more work.
Migration::async_upgrade( 'create_post_outbox_items', 1, 0 ); // Small batch size to force multiple batches.
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items', 1, 1 ) );
$this->assertNotFalse( $scheduled );

// Test no scheduling when callback returns null (no more work).
Migration::async_upgrade( 'create_post_outbox_items', 100, 1000 ); // Large offset to ensure no posts found.
$this->assertFalse(
\wp_next_scheduled( 'activitypub_upgrade', array( 'create_post_outbox_items', 100, 1100 ) )
);
}

/**
* Test async upgrade with multiple arguments.
*
* @covers ::async_upgrade
*/
public function test_async_upgrade_multiple_args() {
// Test that multiple arguments are passed correctly.
Migration::async_upgrade( 'update_comment_counts', 50, 100 );
$scheduled = \wp_next_scheduled( 'activitypub_upgrade', array( 'update_comment_counts', 50, 150 ) );
Scheduler::async_batch( array( Migration::class, 'update_comment_counts' ), 50, 100 );
$scheduled = \wp_next_scheduled( 'activitypub_async_batch', array( array( Migration::class, 'update_comment_counts' ), 50, 150 ) );
$this->assertFalse( $scheduled, 'Should not schedule next batch when no comments found' );
}

Expand Down
65 changes: 65 additions & 0 deletions tests/includes/class-test-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
use Activitypub\Activity\Base_Object;
use Activitypub\Collection\Actors;
use Activitypub\Collection\Outbox;
use Activitypub\Comment;
use Activitypub\Dispatcher;
use Activitypub\Migration;
use Activitypub\Scheduler;

use function Activitypub\add_to_outbox;
Expand Down Expand Up @@ -343,6 +345,69 @@ public function test_purge_outbox_with_different_purge_days() {
$this->assertEquals( 0, wp_count_posts( Outbox::POST_TYPE )->publish );
}

/**
* Test update_comment_counts() with existing valid lock.
*
* @covers ::lock
* @covers ::async_batch
*/
public function test_update_comment_counts_with_existing_valid_lock() {
// Register comment types.
Comment::register_comment_types();

$callback = array( Migration::class, 'update_comment_counts' );
$key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize

// Set a lock.
Scheduler::lock( $key );

\do_action( 'activitypub_update_comment_counts', 10, 0 );

// Verify a scheduled event was created.
$next_scheduled = wp_next_scheduled( 'activitypub_update_comment_counts', array( 10, 0 ) );
$this->assertNotFalse( $next_scheduled );

// Clean up.
delete_option( 'activitypub_migration_lock' );
wp_clear_scheduled_hook( 'activitypub_update_comment_counts', array( 10, 0 ) );
}

/**
* Test async upgrade functionality.
*
* @covers ::async_batch
* @covers ::lock
* @covers ::unlock
*/
public function test_async_upgrade() {
$callback = array( Migration::class, 'create_post_outbox_items' );
$key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize

// Test that lock prevents simultaneous upgrades.
Scheduler::lock( $key );

\do_action( 'activitypub_create_post_outbox_items', 10, 0 );

$scheduled = \wp_next_scheduled( 'activitypub_create_post_outbox_items', array( 10, 0 ) );
$this->assertNotFalse( $scheduled );
Scheduler::unlock( $key );

\remove_action( 'transition_post_status', array( \Activitypub\Scheduler\Post::class, 'schedule_post_activity' ), 33 );
self::factory()->post->create( array( 'meta_input' => array( 'activitypub_status' => 'federated' ) ) );
\add_action( 'transition_post_status', array( \Activitypub\Scheduler\Post::class, 'schedule_post_activity' ), 33, 3 );

// Test scheduling next batch when callback returns more work.
\do_action( 'activitypub_create_post_outbox_items', 1, 0 ); // Small batch size to force multiple batches.
$scheduled = \wp_next_scheduled( 'activitypub_create_post_outbox_items', array( 1, 1 ) );
$this->assertNotFalse( $scheduled );

// Test no scheduling when callback returns null (no more work).
\do_action( 'activitypub_create_post_outbox_items', 100, 1000 ); // Large offset to ensure no posts found.
$this->assertFalse(
\wp_next_scheduled( 'activitypub_create_post_outbox_items', array( 100, 1100 ) )
);
}

/**
* Test async_batch method.
*
Expand Down
Loading