Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MARLIN-615] Add ability to specify sync from date in synchronization job #187

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 19 additions & 14 deletions app/jobs/maestrano/connector/rails/concerns/synchronization_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def find_running_job(organization_id)
# * :only_entities => [person, tasks_list]
# * :full_sync => true synchronization is performed without date filtering
# * :connec_preemption => true|false : preemption is always|never given to connec in case of conflict (if not set, the most recently updated entity is kept)
# * :sync_delta => Integer, Sync x seconds from this date.
def perform(organization_id, opts = {})
organization = Maestrano::Connector::Rails::Organization.find(organization_id)
return unless organization&.sync_enabled
Expand All @@ -64,7 +65,7 @@ def perform(organization_id, opts = {})

begin
last_synchronization = organization.last_successful_synchronization
last_synchronization_date = organization.last_synchronization_date
sync_from_date = find_sync_from_date(organization, opts[:sync_delta])
connec_client = Maestrano::Connector::Rails::ConnecHelper.get_client(organization)
external_client = Maestrano::Connector::Rails::External.get_client(organization)

Expand All @@ -77,22 +78,22 @@ def perform(organization_id, opts = {})
next unless settings[:can_push_to_connec] || settings[:can_push_to_external]

Maestrano::Connector::Rails::ConnectorLogger.log('info', organization, "First synchronization ever. Doing half sync from external for #{entity}.")
first_sync_entity(entity.to_s, organization, connec_client, external_client, last_synchronization_date, opts, true)
first_sync_entity(entity.to_s, organization, connec_client, external_client, sync_from_date, opts, true)
Maestrano::Connector::Rails::ConnectorLogger.log('info', organization, "First synchronization ever. Doing half sync from Connec! for #{entity}.")
first_sync_entity(entity.to_s, organization, connec_client, external_client, last_synchronization_date, opts, false)
first_sync_entity(entity.to_s, organization, connec_client, external_client, sync_from_date, opts, false)
end
elsif opts[:only_entities]
Maestrano::Connector::Rails::ConnectorLogger.log('info', organization, "Synchronization is partial and will synchronize only #{opts[:only_entities].join(' ')}")
# The synchronization is marked as partial and will not be considered as the last-synchronization for the next sync
current_synchronization.mark_as_partial
opts[:only_entities].each do |entity|
sync_entity(entity, organization, connec_client, external_client, last_synchronization_date, opts)
sync_entity(entity, organization, connec_client, external_client, sync_from_date, opts)
end
else
organization.synchronized_entities.each do |entity, settings|
next unless settings[:can_push_to_connec] || settings[:can_push_to_external]

sync_entity(entity.to_s, organization, connec_client, external_client, last_synchronization_date, opts)
sync_entity(entity.to_s, organization, connec_client, external_client, sync_from_date, opts)
end
end

Expand All @@ -104,14 +105,18 @@ def perform(organization_id, opts = {})
end
end

def sync_entity(entity_name, organization, connec_client, external_client, last_synchronization_date, opts)
def find_sync_from_date(organization, sync_delta)
(sync_delta && sync_delta.seconds.ago.utc) || organization.last_synchronization_date
end

def sync_entity(entity_name, organization, connec_client, external_client, sync_from_date, opts)
entity_instance = instanciate_entity(entity_name, organization, connec_client, external_client, opts)

perform_sync(entity_instance, last_synchronization_date)
perform_sync(entity_instance, sync_from_date)
end

# Does a batched sync on either external or connec!
def first_sync_entity(entity_name, organization, connec_client, external_client, last_synchronization_date, opts, external = true)
def first_sync_entity(entity_name, organization, connec_client, external_client, sync_from_date, opts, external = true)
limit = Settings.first_sync_batch_size || 50
skip = 0
entities_count = limit
Expand All @@ -130,7 +135,7 @@ def first_sync_entity(entity_name, organization, connec_client, external_client,
while entities_count == limit
entity_instance.opts_merge!(__skip: skip)

perform_hash = perform_sync(entity_instance, last_synchronization_date, external)
perform_hash = perform_sync(entity_instance, sync_from_date, external)
entities_count = perform_hash[:count]

# Safety: if the connector does not implement batched calls but has exactly limit entities
Expand All @@ -152,14 +157,14 @@ def instanciate_entity(entity_name, organization, connec_client, external_client
end

# Perform the sync and return the entities_count for either external or connec
def perform_sync(entity_instance, last_synchronization_date, external = true)
entity_instance.before_sync(last_synchronization_date)
external_entities = entity_instance.get_external_entities_wrapper(last_synchronization_date)
connec_entities = entity_instance.get_connec_entities(last_synchronization_date)
def perform_sync(entity_instance, sync_from_date, external = true)
entity_instance.before_sync(sync_from_date)
external_entities = entity_instance.get_external_entities_wrapper(sync_from_date)
connec_entities = entity_instance.get_connec_entities(sync_from_date)
mapped_entities = entity_instance.consolidate_and_map_data(connec_entities, external_entities)
entity_instance.push_entities_to_external(mapped_entities[:connec_entities])
entity_instance.push_entities_to_connec(mapped_entities[:external_entities])
entity_instance.after_sync(last_synchronization_date)
entity_instance.after_sync(sync_from_date)

entity_instance.class.count_and_first(external ? external_entities : connec_entities)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ def external_model_to_connec_model(external_hash_of_entities)
# -------------------------------------------------------------
# Entity equivalent methods
# -------------------------------------------------------------
def get_connec_entities(last_synchronization_date = nil)
def get_connec_entities(sync_from_date = nil)
entities = ActiveSupport::HashWithIndifferentAccess.new

self.class.formatted_connec_entities_names.each do |connec_entity_name, connec_class_name|
sub_entity_instance = instantiate_sub_entity_instance(connec_class_name)
entities[connec_entity_name] = sub_entity_instance.get_connec_entities(last_synchronization_date)
entities[connec_entity_name] = sub_entity_instance.get_connec_entities(sync_from_date)
end
entities
end

def get_external_entities_wrapper(last_synchronization_date = nil, entity_name = '')
def get_external_entities_wrapper(sync_from_date = nil, entity_name = '')
entities = ActiveSupport::HashWithIndifferentAccess.new

self.class.formatted_external_entities_names.each do |external_entity_name, external_class_name|
sub_entity_instance = instantiate_sub_entity_instance(external_class_name)
entities[external_entity_name] = sub_entity_instance.get_external_entities_wrapper(last_synchronization_date, external_entity_name)
entities[external_entity_name] = sub_entity_instance.get_external_entities_wrapper(sync_from_date, external_entity_name)
end
entities
end
Expand Down
12 changes: 6 additions & 6 deletions app/models/maestrano/connector/rails/concerns/entity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def map_to_connec_helper(entity, mapper, references)
# * __skip_connec for half syncs
# * __limit and __skip for batch calls
# Returns an array of connec entities
def get_connec_entities(last_synchronization_date = nil)
def get_connec_entities(sync_from_date = nil)
return [] if @opts[:__skip_connec] || !self.class.can_read_connec?

Maestrano::Connector::Rails::ConnectorLogger.log('info', @organization, "Fetching Connec! #{self.class.connec_entity_name}")
Expand All @@ -217,10 +217,10 @@ def get_connec_entities(last_synchronization_date = nil)
query_params[:$skip] = @opts[:__skip]
end

if last_synchronization_date.blank? || @opts[:full_sync]
if sync_from_date.blank? || @opts[:full_sync]
query_params[:$filter] = @opts[:$filter] if @opts[:$filter]
else
query_params[:$filter] = "updated_at gt '#{last_synchronization_date.iso8601}'" + (@opts[:$filter] ? " and #{@opts[:$filter]}" : '')
query_params[:$filter] = "updated_at gt '#{sync_from_date.iso8601}'" + (@opts[:$filter] ? " and #{@opts[:$filter]}" : '')
end

Maestrano::Connector::Rails::ConnectorLogger.log('debug', @organization, "entity=#{self.class.connec_entity_name}, fetching data with #{query_params.to_query}")
Expand Down Expand Up @@ -301,14 +301,14 @@ def batch_op(method, mapped_external_entity, id, connec_entity_name)
# External methods
# ----------------------------------------------
# Wrapper to process options and limitations
def get_external_entities_wrapper(last_synchronization_date = nil, entity_name = self.class.external_entity_name)
def get_external_entities_wrapper(sync_from_date = nil, entity_name = self.class.external_entity_name)
return [] if @opts[:__skip_external] || !self.class.can_read_external?

get_external_entities(entity_name, last_synchronization_date)
get_external_entities(entity_name, sync_from_date)
end

# To be implemented in each connector
def get_external_entities(external_entity_name, last_synchronization_date = nil)
def get_external_entities(external_entity_name, sync_from_date = nil)
Maestrano::Connector::Rails::ConnectorLogger.log('info', @organization, "Fetching #{Maestrano::Connector::Rails::External.external_name} #{external_entity_name.pluralize}")
raise 'Not implemented'
end
Expand Down
4 changes: 2 additions & 2 deletions app/models/maestrano/connector/rails/concerns/entity_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def opts_merge!(opts)
@opts.merge!(opts)
end

def before_sync(last_synchronization_date)
def before_sync(sync_from_date)
# Does nothing by default
end

def after_sync(last_synchronization_date)
def after_sync(sync_from_date)
# Does nothing by default
end

Expand Down
33 changes: 26 additions & 7 deletions spec/jobs/synchronization_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def does_not_perform
expect{ subject }.to change{ Maestrano::Connector::Rails::Synchronization.count }.by(0)
end

def performes
def performs
expect{ subject }.to change{ Maestrano::Connector::Rails::Synchronization.count }.by(1)
end

Expand All @@ -29,7 +29,7 @@ def performes

context 'with a sync still running for more than 30 minutes' do
let!(:running_sync) { create(:synchronization, organization: organization, status: 'RUNNING', created_at: 31.minutes.ago) }
it { performes }
it { performs }
end

describe 'recovery mode' do
Expand All @@ -43,7 +43,7 @@ def performes

context 'synchronization is forced' do
let(:opts) { {forced: true} }
it { performes }
it { performs }
end
end

Expand All @@ -53,7 +53,7 @@ def performes
organization.synchronizations.create(status: 'ERROR', created_at: 2.day.ago, updated_at: 2.day.ago)
end
}
it { performes }
it { performs }
end

context 'three sync failed but last sync is successfull' do
Expand All @@ -63,7 +63,7 @@ def performes
end
organization.synchronizations.create(status: 'SUCCESS', created_at: 1.hour.ago)
}
it { performes }
it { performs }
end
end

Expand All @@ -78,7 +78,7 @@ def performes
context 'subsequent sync' do
let!(:old_sync) { create(:synchronization, partial: false, status: 'SUCCESS', organization: organization) }

it { performes }
it { performs }

context 'with options' do
context 'with only_entities' do
Expand All @@ -96,8 +96,27 @@ def performes
end
end
end
end

context 'with sync_delta' do
before { Timecop.freeze(Time.utc(2008, 9, 1, 12, 0, 0)) }
after { Timecop.return }

let(:opts) { { sync_delta: sync_delta } }
let(:sync_from) { sync_delta.seconds.ago.utc }
let(:sync_delta) { 60 }

it { performs }

it 'passes the correct sync_from dates' do
organization.synchronized_entities.each do |entity, _|
expect_any_instance_of(Maestrano::Connector::Rails::SynchronizationJob).to receive(:sync_entity)
Copy link
Author

@iseessel iseessel Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really just need to test the sync_from. Tested entity.to_s and organization because their readily available.

The other arguments don't relate to this option and would be a pain to test. Don't think its necessary.

.with(entity.to_s, organization, anything, anything, sync_from, anything)
end

subject
end
end
end
end
end

Expand Down