Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add contract_data_updated_at to integrations table to speed up dashboard query #617

Merged
merged 16 commits into from Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,13 @@
Sequel.migration do
up do
alter_table(:integrations) do
add_column(:contract_data_updated_at, DateTime)
end
end

down do
alter_table(:integrations) do
drop_column(:contract_data_updated_at)
end
end
end
@@ -0,0 +1,11 @@
require "pact_broker/db/data_migrations/set_contract_data_updated_at_for_integrations"

Sequel.migration do
up do
PactBroker::DB::DataMigrations::SetContractDataUpdatedAtForIntegrations.call(self)
end

down do

end
end
1 change: 1 addition & 0 deletions lib/pact_broker/api.rb
Expand Up @@ -5,6 +5,7 @@
require "pact_broker/api/contracts"
require "pact_broker/application_context"
require "pact_broker/feature_toggle"
require "pact_broker/initializers/subscriptions"

module Webmachine
class Request
Expand Down
@@ -0,0 +1,47 @@
# Populate the newly created contract_data_updated_at date in the integrations table
# using the latest created_at date from the pact_publications or verifications tables.
module PactBroker
module DB
module DataMigrations
class SetContractDataUpdatedAtForIntegrations
def self.call(connection)
join = {
Sequel[:integrations][:consumer_id] => Sequel[:target][:consumer_id],
Sequel[:integrations][:provider_id] => Sequel[:target][:provider_id]
}

max_created_at_for_each_integration = integrations_max_created_at(connection).from_self(alias: :target).select(:created_at).where(join)

connection[:integrations]
.where(contract_data_updated_at: nil)
.update(contract_data_updated_at: max_created_at_for_each_integration)
end

# @return [Sequel::Dataset] the overall max created_at from the union of the pact_publications and verifications tables,
# for each integration keyed by consumer_id/provider_id
def self.integrations_max_created_at(connection)
pact_publication_max_created_at(connection)
.union(verification_max_created_at(connection))
.select_group(:consumer_id, :provider_id)
.select_append{ max(:created_at).as(:created_at) }
end

# @return [Sequel::Dataset] the max created_at from the pact_publications table
# for each integration keyed by consumer_id/provider_id
def self.pact_publication_max_created_at(connection)
connection[:pact_publications]
.select_group(:consumer_id, :provider_id)
.select_append{ max(:created_at).as(:created_at) }
end

# @return [Sequel::Dataset] the max created_at from the verifications table
# for each integration keyed by consumer_id/provider_id
def self.verification_max_created_at(connection)
connection[:verifications]
.select_group(:consumer_id, :provider_id)
.select_append{ max(:created_at).as(:created_at) }
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/pact_broker/db/migrate_data.rb
Expand Up @@ -29,6 +29,7 @@ def self.call database_connection, _options = {}
DataMigrations::CreateBranches.call(database_connection)
DataMigrations::MigrateIntegrations.call(database_connection)
DataMigrations::MigratePactVersionProviderTagSuccessfulVerifications.call(database_connection)
DataMigrations::SetContractDataUpdatedAtForIntegrations.call(database_connection)
end
end
end
Expand Down
16 changes: 12 additions & 4 deletions lib/pact_broker/events/subscriber.rb
Expand Up @@ -32,11 +32,19 @@ module Events
extend self

def subscribe(*args)
result = nil
TemporaryListeners.subscribe(*args) do
result = yield
if block_given?
result = nil
TemporaryListeners.subscribe(*args) do
result = yield
end
result
else
Wisper.subscribe(*args)
end
result
end

def unsubscribe(*args)
Wisper.unsubscribe(*args)
end
end
end
4 changes: 4 additions & 0 deletions lib/pact_broker/initializers/subscriptions.rb
@@ -0,0 +1,4 @@
require "pact_broker/events/subscriber"
require "pact_broker/integrations/event_listener"

PactBroker::Events.subscribe(PactBroker::Integrations::EventListener.new)
23 changes: 23 additions & 0 deletions lib/pact_broker/integrations/event_listener.rb
@@ -0,0 +1,23 @@
require "pact_broker/services"

# Listens for events that happen in the Pact Broker that are relevant to the Integrations objects.

module PactBroker
module Integrations
class EventListener
include PactBroker::Services

# @param [Hash] params the params from the broadcast event
# @option params [PactBroker::Domain::Pact] :pact the newly published pact
def contract_published(params)
integration_service.handle_contract_data_published(params.fetch(:pact).consumer, params.fetch(:pact).provider)
end

# @param [Hash] params the params from the broadcast event
# @option params [PactBroker::Domain::Verification] :verification the newly published verification
def provider_verification_published(params)
integration_service.handle_contract_data_published(params.fetch(:verification).consumer, params.fetch(:verification).provider)
end
end
end
end
5 changes: 4 additions & 1 deletion lib/pact_broker/integrations/integration.rb
@@ -1,3 +1,4 @@
require "pact_broker/dataset"
require "pact_broker/verifications/pseudo_branch_status"
require "pact_broker/domain/verification"
require "pact_broker/webhooks/latest_triggered_webhook"
Expand All @@ -6,7 +7,7 @@

module PactBroker
module Integrations
class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id))
class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id, :contract_data_updated_at))
set_primary_key :id
plugin :insert_ignore, identifying_columns: [:consumer_id, :provider_id]
associate(:many_to_one, :consumer, :class => "PactBroker::Domain::Pacticipant", :key => :consumer_id, :primary_key => :id)
Expand Down Expand Up @@ -75,6 +76,8 @@ class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :c
end)

dataset_module do
include PactBroker::Dataset

def including_pacticipant_id(pacticipant_id)
where(consumer_id: pacticipant_id).or(provider_id: pacticipant_id)
end
Expand Down
9 changes: 9 additions & 0 deletions lib/pact_broker/integrations/repository.rb
Expand Up @@ -17,6 +17,15 @@ def create_for_pact(consumer_id, provider_id)
def delete(consumer_id, provider_id)
Integration.where(consumer_id: consumer_id, provider_id: provider_id).delete
end

# Sets the contract_data_updated_at for the integration(s) as specified by the consumer and provider
# @param [PactBroker::Domain::Pacticipant, nil] consumer the consumer for the integration, or nil if for a provider-only event (eg. Pactflow provider contract published)
# @param [PactBroker::Domain::Pacticipant] provider the provider for the integration
def set_contract_data_updated_at(consumer, provider)
Integration
.where({ consumer_id: consumer&.id, provider_id: provider.id }.compact )
.update(contract_data_updated_at: Sequel.datetime_class.now)
end
end
end
end
7 changes: 7 additions & 0 deletions lib/pact_broker/integrations/service.rb
Expand Up @@ -32,6 +32,13 @@ def self.find_all
.sort { | a, b| Integration.compare_by_last_action_date(a, b) }
end

# Callback to invoke when a consumer contract, verification result (or provider contract in Pactflow) is published
# @param [PactBroker::Domain::Pacticipant] consumer or nil
# @param [PactBroker::Domain::Pacticipant] provider
def self.handle_contract_data_published(consumer, provider)
integration_repository.set_contract_data_updated_at(consumer, provider)
end

def self.delete(consumer_name, provider_name)
consumer = pacticipant_service.find_pacticipant_by_name!(consumer_name)
provider = pacticipant_service.find_pacticipant_by_name!(provider_name)
Expand Down
4 changes: 4 additions & 0 deletions lib/pact_broker/test/test_data_builder.rb
Expand Up @@ -493,6 +493,10 @@ def and_return instance_variable_name
instance_variable_get("@#{instance_variable_name}")
end

def clear_now
@now = nil
end

def set_now date
@now = date.to_date
self
Expand Down
18 changes: 0 additions & 18 deletions spec/features/publish_verification_results_and_version_spec.rb
Expand Up @@ -49,22 +49,4 @@
expect(last_response.status).to be 200
expect(JSON.parse(subject.body)).to include JSON.parse(verification_content)
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have moved this spec out of the feature directory, into the integration test directory.


context "with a webhook configured", job: true do
before do
td.create_webhook(
method: "POST",
url: "http://example.org",
events: [{ name: PactBroker::Webhooks::WebhookEvent::VERIFICATION_PUBLISHED }]
)
end
let!(:request) do
stub_request(:post, "http://example.org").to_return(:status => 200)
end

it "executes the webhook" do
subject
expect(request).to have_been_made
end
end
end
6 changes: 6 additions & 0 deletions spec/integration/endpoints/publish_contracts_spec.rb
Expand Up @@ -21,6 +21,7 @@
let(:rack_headers) { { "CONTENT_TYPE" => "application/json", "HTTP_ACCEPT" => "application/hal+json" } }
let(:encoded_contract) { Base64.strict_encode64(contract) }
let(:path) { "/contracts/publish" }
let(:contract) { td.fixed_json_content("Foo", "Bar", "1") }

subject { post(path, request_body_hash.to_json, rack_headers) }

Expand All @@ -30,4 +31,9 @@
its(:status) { is_expected.to eq 400 }
its(:body) { is_expected.to include("non UTF-8 character") }
end

it "sets the contract_data_updated_at on the integration" do
subject
expect(PactBroker::Integrations::Integration.last.contract_data_updated_at).to_not be nil
end
end
52 changes: 52 additions & 0 deletions spec/integration/endpoints/publish_verification_results_spec.rb
@@ -0,0 +1,52 @@
require "pact_broker/domain/verification"
require "timecop"

describe "Publishing a pact verification" do
let(:path) { "/pacts/provider/Provider/consumer/Consumer/pact-version/#{pact.pact_version_sha}/verification-results" }
let(:verification_content) { load_fixture("verification.json") }
let(:parsed_response_body) { JSON.parse(subject.body) }
let(:pact) { td.pact }
let(:rack_env) do
{
"CONTENT_TYPE" => "application/json",
"HTTP_ACCEPT" => "application/hal+json",
"pactbroker.database_connector" => lambda { |&block| block.call }
}
end

subject { post(path, verification_content, rack_env) }

before do
Timecop.freeze(Date.today - 2) do
td.create_provider("Provider")
.create_consumer("Consumer")
.create_consumer_version("1.0.0")
.create_pact
.create_consumer_version("1.2.3")
.create_pact
.revise_pact
end
end

it "updates the contract_data_updated_at on the integration" do
expect { subject }.to change { PactBroker::Integrations::Integration.last.contract_data_updated_at }
end

context "with a webhook configured", job: true do
before do
td.create_webhook(
method: "POST",
url: "http://example.org",
events: [{ name: PactBroker::Webhooks::WebhookEvent::VERIFICATION_PUBLISHED }]
)
end
let!(:request) do
stub_request(:post, "http://example.org").to_return(:status => 200)
end

it "executes the webhook" do
subject
expect(request).to have_been_made
end
end
end
@@ -0,0 +1,49 @@
require "pact_broker/db/data_migrations/set_contract_data_updated_at_for_integrations"
require "timecop"
require "tzinfo"

module PactBroker
module DB
module DataMigrations
describe SetContractDataUpdatedAtForIntegrations do
before do
td.clear_now # use timecop instead of the TestDataBuilder @now

Timecop.freeze(day_1) do
td.publish_pact(consumer_name: "Foo", provider_name: "Bar", consumer_version_number: "1")
end

Timecop.freeze(day_2) do
td.publish_pact(consumer_name: "Foo", provider_name: "Bar", consumer_version_number: "2")
end

Timecop.freeze(day_3) do
td.create_verification(provider_version: "2")
end

Timecop.freeze(day_4) do
td.publish_pact(consumer_name: "Cat", provider_name: "Dog", consumer_version_number: "2")
end

db[:integrations].update(contract_data_updated_at: nil)
end

let(:day_1) { td.in_utc{ DateTime.new(2023, 6, 11) } }
let(:day_2) { td.in_utc{ DateTime.new(2023, 6, 12) } }
let(:day_3) { td.in_utc{ DateTime.new(2023, 6, 13) } }
let(:day_4) { td.in_utc{ DateTime.new(2023, 6, 14) } }

let(:db) { PactBroker::Domain::Version.db }

subject { SetContractDataUpdatedAtForIntegrations.call(db) }

it "sets the contract_data_updated_at to the latest of the pact publication and verification publication dates for that integration" do
subject
integrations = db[:integrations].order(:id)
expect(integrations.first[:contract_data_updated_at].to_s).to eq day_3.to_s # using to_s because dates are loaded as strings when running with MySQL
expect(integrations.last[:contract_data_updated_at].to_s).to eq day_4.to_s
end
end
end
end
end